Skip to content

Commit

Permalink
reinstate HttpChannel reuse in H2 (#10868)
Browse files Browse the repository at this point in the history
Reinstated HttpChannel reuse in H2.

Signed-off-by: Ludovic Orban <[email protected]>
Signed-off-by: Simone Bordet <[email protected]>
Co-authored-by: Joakim Erdfelt <[email protected]>
Co-authored-by: Ludovic Orban <[email protected]>
Co-authored-by: Simone Bordet <[email protected]>
  • Loading branch information
4 people authored Nov 14, 2023
1 parent 49b3442 commit 4dd8bc9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;

import org.eclipse.jetty.http.BadMessageException;
Expand Down Expand Up @@ -60,6 +62,8 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private static final Logger LOG = LoggerFactory.getLogger(HTTP2ServerConnection.class);

private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
// This unbounded queue will always be limited by the max number of concurrent streams per connection.
private final Queue<HttpChannel> httpChannels = new ConcurrentLinkedQueue<>();
private final Attributes attributes = new Lazy();
private final List<Frame> upgradeFrames = new ArrayList<>();
private final Connector connector;
Expand All @@ -68,6 +72,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
private final String id;
private final SocketAddress localSocketAddress;
private final SocketAddress remoteSocketAddress;
private boolean recycleHttpChannels;

public HTTP2ServerConnection(Connector connector, EndPoint endPoint, HttpConfiguration httpConfig, HTTP2ServerSession session, int inputBufferSize, ServerSessionListener listener)
{
Expand All @@ -78,6 +83,17 @@ public HTTP2ServerConnection(Connector connector, EndPoint endPoint, HttpConfigu
this.id = StringUtil.randomAlphaNumeric(16);
localSocketAddress = httpConfig.getLocalAddress() != null ? httpConfig.getLocalAddress() : endPoint.getLocalSocketAddress();
remoteSocketAddress = endPoint.getRemoteSocketAddress();
setRecycleHttpChannels(true);
}

public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;
}

public void setRecycleHttpChannels(boolean recycleHttpChannels)
{
this.recycleHttpChannels = recycleHttpChannels;
}

@Override
Expand Down Expand Up @@ -122,7 +138,7 @@ public void onNewStream(HTTP2Stream stream, HeadersFrame frame)
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);

HttpChannel httpChannel = httpChannelFactory.newHttpChannel(this);
HttpChannel httpChannel = pollHttpChannel();
HttpStreamOverHTTP2 httpStream = new HttpStreamOverHTTP2(this, httpChannel, stream);
httpChannel.setHttpStream(httpStream);
stream.setAttachment(httpStream);
Expand Down Expand Up @@ -227,83 +243,29 @@ public void push(HTTP2Stream stream, MetaData.Request request)
if (LOG.isDebugEnabled())
LOG.debug("Processing push {} on {}", request, stream);

HttpChannel httpChannel = httpChannelFactory.newHttpChannel(this);
HttpChannel httpChannel = pollHttpChannel();
HttpStreamOverHTTP2 httpStream = new HttpStreamOverHTTP2(this, httpChannel, stream);
httpChannel.setHttpStream(httpStream);
Runnable task = httpStream.onPushRequest(request);
if (task != null)
offerTask(task, true);
}

/*
// TODO: re-instate recycle channel functionality, but using Pool.
private final AutoLock lock = new AutoLock();
private final Queue<HttpChannelOverHTTP2> channels = new ArrayDeque<>();
private boolean recycleHttpChannels = true;
public boolean isRecycleHttpChannels()
{
return recycleHttpChannels;
}
public void setRecycleHttpChannels(boolean recycleHttpChannels)
{
this.recycleHttpChannels = recycleHttpChannels;
}
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
{
HttpChannelOverHTTP2 channel = pollHttpChannel();
if (channel != null)
{
channel.getHttpTransport().setStream(stream);
if (LOG.isDebugEnabled())
LOG.debug("Recycling channel {} for {}", channel, this);
}
else
{
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this);
transport.setStream(stream);
channel = newServerHttpChannelOverHTTP2(connector, httpConfig, transport);
channel.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("Creating channel {} for {}", channel, this);
}
stream.setAttachment(channel);
return channel;
}
protected ServerHttpChannelOverHTTP2 newServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration httpConfig, HttpTransportOverHTTP2 transport)
{
return new ServerHttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), transport);
}
private void offerHttpChannel(HttpChannelOverHTTP2 channel)
private HttpChannel pollHttpChannel()
{
HttpChannel httpChannel = null;
if (isRecycleHttpChannels())
{
try (AutoLock l = lock.lock())
{
channels.offer(channel);
}
}
httpChannel = httpChannels.poll();
if (httpChannel == null)
httpChannel = httpChannelFactory.newHttpChannel(this);
return httpChannel;
}

private HttpChannelOverHTTP2 pollHttpChannel()
void offerHttpChannel(HttpChannel channel)
{
if (isRecycleHttpChannels())
{
try (AutoLock l = lock.lock())
{
return channels.poll();
}
}
else
{
return null;
}
httpChannels.offer(channel);
}
*/

public boolean upgrade(Request request, HttpFields.Mutable responseFields)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ public void succeeded()
}
}
_httpChannel.recycle();
_connection.offerHttpChannel(_httpChannel);
}

@Override
Expand Down

0 comments on commit 4dd8bc9

Please sign in to comment.