Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reinstate HttpChannel reuse in H2 #10868

Merged
merged 5 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.http.BadMessageException;
Expand Down Expand Up @@ -60,6 +62,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private static final Logger LOG = LoggerFactory.getLogger(HTTP2ServerConnection.class);

private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
// This unbounded queue will always be limited by the max number of concurrent streams per connection.
private final Queue<HttpChannel> httpChannels = new ConcurrentLinkedQueue<>();
sbordet marked this conversation as resolved.
Show resolved Hide resolved
private final Attributes attributes = new Lazy();
private final List<Frame> upgradeFrames = new ArrayList<>();
private final Connector connector;
Expand All @@ -68,6 +72,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final String id;
private final SocketAddress localSocketAddress;
private final SocketAddress remoteSocketAddress;
private boolean recycleHttpChannels;

public HTTP2ServerConnection(Connector connector, EndPoint endPoint, HttpConfiguration httpConfig, HTTP2ServerSession session, int inputBufferSize, ServerSessionListener listener)
{
Expand All @@ -78,6 +83,17 @@ public HTTP2ServerConnection(Connector connector, EndPoint endPoint, HttpConfigu
this.id = StringUtil.randomAlphaNumeric(16);
localSocketAddress = httpConfig.getLocalAddress() != null ? httpConfig.getLocalAddress() : endPoint.getLocalSocketAddress();
remoteSocketAddress = endPoint.getRemoteSocketAddress();
setRecycleHttpChannels(true);
}

public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;
}

public void setRecycleHttpChannels(boolean recycleHttpChannels)
{
this.recycleHttpChannels = recycleHttpChannels;
}

@Override
Expand Down Expand Up @@ -122,7 +138,7 @@ public void onNewStream(HTTP2Stream stream, HeadersFrame frame)
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);

HttpChannel httpChannel = httpChannelFactory.newHttpChannel(this);
HttpChannel httpChannel = pollHttpChannel();
HttpStreamOverHTTP2 httpStream = new HttpStreamOverHTTP2(this, httpChannel, stream);
httpChannel.setHttpStream(httpStream);
stream.setAttachment(httpStream);
Expand Down Expand Up @@ -227,83 +243,29 @@ public void push(HTTP2Stream stream, MetaData.Request request)
if (LOG.isDebugEnabled())
LOG.debug("Processing push {} on {}", request, stream);

HttpChannel httpChannel = httpChannelFactory.newHttpChannel(this);
HttpChannel httpChannel = pollHttpChannel();
HttpStreamOverHTTP2 httpStream = new HttpStreamOverHTTP2(this, httpChannel, stream);
httpChannel.setHttpStream(httpStream);
Runnable task = httpStream.onPushRequest(request);
if (task != null)
offerTask(task, true);
}

/*
// TODO: re-instate recycle channel functionality, but using Pool.
private final AutoLock lock = new AutoLock();
private final Queue<HttpChannelOverHTTP2> channels = new ArrayDeque<>();
private boolean recycleHttpChannels = true;
public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;
}
public void setRecycleHttpChannels(boolean recycleHttpChannels)
{
this.recycleHttpChannels = recycleHttpChannels;
}
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
{
HttpChannelOverHTTP2 channel = pollHttpChannel();
if (channel != null)
{
channel.getHttpTransport().setStream(stream);
if (LOG.isDebugEnabled())
LOG.debug("Recycling channel {} for {}", channel, this);
}
else
{
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this);
transport.setStream(stream);
channel = newServerHttpChannelOverHTTP2(connector, httpConfig, transport);
channel.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("Creating channel {} for {}", channel, this);
}
stream.setAttachment(channel);
return channel;
}
protected ServerHttpChannelOverHTTP2 newServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration httpConfig, HttpTransportOverHTTP2 transport)
{
return new ServerHttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), transport);
}
private void offerHttpChannel(HttpChannelOverHTTP2 channel)
private HttpChannel pollHttpChannel()
{
HttpChannel httpChannel = null;
if (isRecycleHttpChannels())
{
try (AutoLock l = lock.lock())
{
channels.offer(channel);
}
}
httpChannel = httpChannels.poll();
if (httpChannel == null)
httpChannel = httpChannelFactory.newHttpChannel(this);
return httpChannel;
}

private HttpChannelOverHTTP2 pollHttpChannel()
void offerHttpChannel(HttpChannel channel)
{
if (isRecycleHttpChannels())
{
try (AutoLock l = lock.lock())
{
return channels.poll();
}
}
else
{
return null;
}
httpChannels.offer(channel);
}
*/

public boolean upgrade(Request request, HttpFields.Mutable responseFields)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ public void succeeded()
}
}
_httpChannel.recycle();
_connection.offerHttpChannel(_httpChannel);
}

@Override
Expand Down
Loading