Skip to content

Commit

Permalink
Fixes #9166 - Jetty 12: review/remove ByteBufferPool (#9195)
Browse files Browse the repository at this point in the history
* Fixes #9166 - Jetty 12: review/remove ByteBufferPool

* Replaced usages of ByteBufferPool with RetainableByteBufferPool.
* Removed ByteBufferPool and related classes.
* Renamed oej.http2.frames.DataFrame.getData() -> getByteBuffer() for consistency.
* Removed Accumulator.acquire(), and updated code to use RetainableByteBufferPool.acquire() instead.
* Fixed HttpOutput callbacks to correctly call super.onCompleteSuccess() and super.onCompleteFailure().

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Jan 27, 2023
1 parent 63d963d commit ded18f5
Show file tree
Hide file tree
Showing 294 changed files with 3,625 additions and 5,361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The `ClientConnector` primarily wraps the link:{javadoc-url}/org/eclipse/jetty/i

* a thread pool (in form of an `java.util.concurrent.Executor`)
* a scheduler (in form of `org.eclipse.jetty.util.thread.Scheduler`)
* a byte buffer pool (in form of `org.eclipse.jetty.io.ByteBufferPool`)
* a byte buffer pool (in form of `org.eclipse.jetty.io.RetainableByteBufferPool`)
* a TLS factory (in form of `org.eclipse.jetty.util.ssl.SslContextFactory.Client`)

The `ClientConnector` is where you want to set those components after you have configured them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void onDataAvailable(Stream stream)
}

// Get the content buffer.
ByteBuffer byteBuffer = data.frame().getData();
ByteBuffer byteBuffer = data.frame().getByteBuffer();

// Unwrap the Data object, converting it to a Chunk.
// The Data.release() semantic is maintained in the completion of the Callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void writeLine(String line, Callback callback)

// Wrap the "telnet" ClientConnectionFactory with the SslClientConnectionFactory.
connectionFactory = new SslClientConnectionFactory(clientConnector.getSslContextFactory(),
clientConnector.getByteBufferPool(), clientConnector.getExecutor(), connectionFactory);
clientConnector.getRetainableByteBufferPool(), clientConnector.getExecutor(), connectionFactory);

// We will obtain a SslConnection now.
CompletableFuture<SslConnection> connectionPromise = new Promise.Completable<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.transport.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Content;
Expand Down Expand Up @@ -334,14 +333,13 @@ public void asyncRequestContent() throws Exception
// An event happens in some other class, in some other thread.
class ContentPublisher
{
void publish(ByteBufferPool bufferPool, byte[] bytes, boolean lastContent)
void publish(byte[] bytes, boolean lastContent)
{
// Wrap the bytes into a new ByteBuffer.
ByteBuffer buffer = ByteBuffer.wrap(bytes);

// Offer the content, and release the ByteBuffer
// to the pool when the Callback is completed.
content.write(buffer, Callback.from(() -> bufferPool.release(buffer)));
// Write the content.
content.write(buffer, Callback.NOOP);

// Close AsyncRequestContent when all the content is arrived.
if (lastContent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void onDataAvailable(Stream stream)
}

// Get the content buffer.
ByteBuffer buffer = data.frame().getData();
ByteBuffer buffer = data.frame().getByteBuffer();

// Consume the buffer, here - as an example - just log it.
System.getLogger("http2").log(INFO, "Consuming buffer {0}", buffer);
Expand Down Expand Up @@ -368,7 +368,7 @@ public void onDataAvailable(Stream stream)
}

// The pushed stream "response" content bytes.
ByteBuffer buffer = data.frame().getData();
ByteBuffer buffer = data.frame().getByteBuffer();
// Consume the buffer and release the Data object.
data.release();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void onDataAvailable(Stream stream)
}

// Get the content buffer.
ByteBuffer buffer = data.frame().getData();
ByteBuffer buffer = data.frame().getByteBuffer();

// Consume the buffer, here - as an example - just log it.
System.getLogger("http2").log(INFO, "Consuming buffer {0}", buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
connector.setBindAddress(httpClient.getBindAddress());
connector.setByteBufferPool(httpClient.getByteBufferPool());
connector.setRetainableByteBufferPool(httpClient.getRetainableByteBufferPool());
connector.setConnectBlocking(httpClient.isConnectBlocking());
connector.setConnectTimeout(Duration.ofMillis(httpClient.getConnectTimeout()));
connector.setExecutor(httpClient.getExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.RetainableByteBuffer;

/**
* {@link ContentDecoder} decodes content bytes of a response.
Expand All @@ -29,21 +30,14 @@
public interface ContentDecoder
{
/**
* <p>Decodes the bytes in the given {@code buffer} and returns decoded bytes, if any.</p>
* <p>Decodes the bytes in the given {@code buffer} and returns the decoded bytes.</p>
* <p>The returned {@link RetainableByteBuffer} containing the decoded bytes may
* be empty and <b>must</b> be released via {@link RetainableByteBuffer#release()}.</p>
*
* @param buffer the buffer containing encoded bytes
* @return a buffer containing decoded bytes, if any
* @return a buffer containing decoded bytes that must be released
*/
public abstract ByteBuffer decode(ByteBuffer buffer);

/**
* <p>Releases the ByteBuffer returned by {@link #decode(ByteBuffer)}.</p>
*
* @param decoded the ByteBuffer returned by {@link #decode(ByteBuffer)}
*/
public default void release(ByteBuffer decoded)
{
}
public abstract RetainableByteBuffer decode(ByteBuffer buffer);

/**
* Factory for {@link ContentDecoder}s; subclasses must implement {@link #newContentDecoder()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@

package org.eclipse.jetty.client;

import java.nio.ByteBuffer;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;

/**
* {@link ContentDecoder} for the "gzip" encoding.
Expand All @@ -34,13 +33,13 @@ public GZIPContentDecoder(int bufferSize)
this(null, bufferSize);
}

public GZIPContentDecoder(ByteBufferPool byteBufferPool, int bufferSize)
public GZIPContentDecoder(RetainableByteBufferPool retainableByteBufferPool, int bufferSize)
{
super(byteBufferPool, bufferSize);
super(retainableByteBufferPool, bufferSize);
}

@Override
protected boolean decodedChunk(ByteBuffer chunk)
protected boolean decodedChunk(RetainableByteBuffer chunk)
{
super.decodedChunk(chunk);
return true;
Expand All @@ -51,8 +50,8 @@ protected boolean decodedChunk(ByteBuffer chunk)
*/
public static class Factory extends ContentDecoder.Factory
{
private final RetainableByteBufferPool retainableByteBufferPool;
private final int bufferSize;
private final ByteBufferPool byteBufferPool;

public Factory()
{
Expand All @@ -64,22 +63,22 @@ public Factory(int bufferSize)
this(null, bufferSize);
}

public Factory(ByteBufferPool byteBufferPool)
public Factory(RetainableByteBufferPool retainableByteBufferPool)
{
this(byteBufferPool, DEFAULT_BUFFER_SIZE);
this(retainableByteBufferPool, DEFAULT_BUFFER_SIZE);
}

public Factory(ByteBufferPool byteBufferPool, int bufferSize)
public Factory(RetainableByteBufferPool retainableByteBufferPool, int bufferSize)
{
super("gzip");
this.byteBufferPool = byteBufferPool;
this.retainableByteBufferPool = retainableByteBufferPool;
this.bufferSize = bufferSize;
}

@Override
public ContentDecoder newContentDecoder()
{
return new GZIPContentDecoder(byteBufferPool, bufferSize);
return new GZIPContentDecoder(retainableByteBufferPool, bufferSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Fields;
Expand Down Expand Up @@ -201,11 +199,9 @@ protected void doStart() throws Exception
int maxBucketSize = executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2;
ByteBufferPool byteBufferPool = getByteBufferPool();
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool(2048, maxBucketSize));
if (getBean(RetainableByteBufferPool.class) == null)
addBean(new ArrayRetainableByteBufferPool(0, 2048, 65536, maxBucketSize));
RetainableByteBufferPool retainableByteBufferPool = getRetainableByteBufferPool();
if (retainableByteBufferPool == null)
setRetainableByteBufferPool(new ArrayRetainableByteBufferPool(0, 2048, 65536, maxBucketSize));
Scheduler scheduler = getScheduler();
if (scheduler == null)
{
Expand All @@ -224,7 +220,7 @@ protected void doStart() throws Exception
handlers.put(new ProxyAuthenticationProtocolHandler(this));
handlers.put(new UpgradeProtocolHandler());

decoderFactories.put(new GZIPContentDecoder.Factory(byteBufferPool));
decoderFactories.put(new GZIPContentDecoder.Factory(retainableByteBufferPool));

cookieManager = newCookieManager();
cookieStore = cookieManager.getCookieStore();
Expand Down Expand Up @@ -650,19 +646,19 @@ public ProtocolHandler findProtocolHandler(Request request, Response response)
}

/**
* @return the {@link ByteBufferPool} of this HttpClient
* @return the {@link RetainableByteBufferPool} of this HttpClient
*/
public ByteBufferPool getByteBufferPool()
public RetainableByteBufferPool getRetainableByteBufferPool()
{
return connector.getByteBufferPool();
return connector.getRetainableByteBufferPool();
}

/**
* @param byteBufferPool the {@link ByteBufferPool} of this HttpClient
* @param retainableByteBufferPool the {@link RetainableByteBufferPool} of this HttpClient
*/
public void setByteBufferPool(ByteBufferPool byteBufferPool)
public void setRetainableByteBufferPool(RetainableByteBufferPool retainableByteBufferPool)
{
connector.setByteBufferPool(byteBufferPool);
connector.setRetainableByteBufferPool(retainableByteBufferPool);
}

/**
Expand Down Expand Up @@ -1156,6 +1152,6 @@ public ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory.C
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory);
return new SslClientConnectionFactory(sslContextFactory, getRetainableByteBufferPool(), getExecutor(), connectionFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.client.internal;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -27,8 +26,8 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
Expand Down Expand Up @@ -578,17 +577,17 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
_chunk.retain();
if (LOG.isDebugEnabled())
LOG.debug("decoding: {}", _chunk);
ByteBuffer decodedBuffer = _decoder.decode(_chunk.getByteBuffer());
RetainableByteBuffer decodedBuffer = _decoder.decode(_chunk.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("decoded: {}", BufferUtil.toDetailString(decodedBuffer));
LOG.debug("decoded: {}", decodedBuffer);

if (BufferUtil.hasContent(decodedBuffer))
if (decodedBuffer != null && decodedBuffer.hasRemaining())
{
// The decoded ByteBuffer is a transformed "copy" of the
// compressed one, so it has its own reference counter.
if (LOG.isDebugEnabled())
LOG.debug("returning decoded content");
return Content.Chunk.from(decodedBuffer, false, _decoder::release);
return Content.Chunk.asChunk(decodedBuffer.getByteBuffer(), false, decodedBuffer);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
parser.setHeaderCacheSize(httpTransport.getHeaderCacheSize());
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}
retainableByteBufferPool = httpClient.getByteBufferPool().asRetainableByteBufferPool();
retainableByteBufferPool = httpClient.getRetainableByteBufferPool();
}

void receive()
Expand Down Expand Up @@ -169,7 +169,7 @@ private HttpConnectionOverHTTP getHttpConnection()

protected ByteBuffer getResponseBuffer()
{
return networkBuffer == null ? null : networkBuffer.getBuffer();
return networkBuffer == null ? null : networkBuffer.getByteBuffer();
}

private void acquireNetworkBuffer()
Expand Down Expand Up @@ -222,7 +222,7 @@ protected ByteBuffer onUpgradeFrom()
HttpClient client = getHttpDestination().getHttpClient();
upgradeBuffer = BufferUtil.allocate(networkBuffer.remaining(), client.isUseInputDirectByteBuffers());
BufferUtil.clearToFill(upgradeBuffer);
BufferUtil.put(networkBuffer.getBuffer(), upgradeBuffer);
BufferUtil.put(networkBuffer.getByteBuffer(), upgradeBuffer);
BufferUtil.flipToFlush(upgradeBuffer, 0);
}
releaseNetworkBuffer();
Expand All @@ -245,7 +245,7 @@ private boolean parseAndFill()
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getBuffer()), this);
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this);
// Always parse even empty buffers to advance the parser.
if (parse())
{
Expand All @@ -269,7 +269,7 @@ private boolean parseAndFill()
reacquireNetworkBuffer();

// The networkBuffer may have been reacquired.
int read = endPoint.fill(networkBuffer.getBuffer());
int read = endPoint.fill(networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes in {} from {} in {}", read, networkBuffer, endPoint, this);

Expand Down Expand Up @@ -309,7 +309,7 @@ private boolean parse()
{
while (true)
{
boolean handle = parser.parseNext(networkBuffer.getBuffer());
boolean handle = parser.parseNext(networkBuffer.getByteBuffer());
if (LOG.isDebugEnabled())
LOG.debug("Parse result={} on {}", handle, this);
Runnable action = getAndSetAction(null);
Expand Down Expand Up @@ -347,7 +347,7 @@ private boolean parse()
if (getHttpChannel().isTunnel(method, status))
return true;

if (networkBuffer.isEmpty())
if (!networkBuffer.hasRemaining())
return false;

if (!HttpStatus.isInformational(status))
Expand All @@ -359,7 +359,7 @@ private boolean parse()
return false;
}

if (networkBuffer.isEmpty())
if (!networkBuffer.hasRemaining())
return false;
}
}
Expand Down
Loading

0 comments on commit ded18f5

Please sign in to comment.