Skip to content

Commit

Permalink
#12082 - fix DynamicCapacity.release()
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban committed Aug 7, 2024
1 parent 2ba2250 commit ceba33f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
private final Collection<HTTP2Session.Entry> processedEntries = new ArrayList<>();
private final HTTP2Session session;
private final RetainableByteBuffer.Mutable accumulator;
private boolean released;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
private Throwable terminated;
private HTTP2Session.Entry stalledEntry;
Expand Down Expand Up @@ -308,7 +307,7 @@ protected void onSuccess()

private void finish()
{
release();
accumulator.clear();
processedEntries.forEach(HTTP2Session.Entry::succeeded);
processedEntries.clear();
invocationType = InvocationType.NON_BLOCKING;
Expand All @@ -328,15 +327,6 @@ private void finish()
}
}

private void release()
{
if (!released)
{
released = true;
accumulator.release();
}
}

@Override
protected void onCompleteSuccess()
{
Expand All @@ -346,7 +336,7 @@ protected void onCompleteSuccess()
@Override
protected void onCompleteFailure(Throwable x)
{
release();
accumulator.release();

Throwable closed;
Set<HTTP2Session.Entry> allEntries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ByteBuffer badMessageError(int status, String reason, HttpFields.Mutable
Thread.sleep(1000);

// Send a second request and verify that it hits the Handler.
accumulator.release();
accumulator.clear();
MetaData.Request metaData2 = new MetaData.Request(
HttpMethod.GET.asString(),
HttpScheme.HTTP.asString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.http2.tests;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
Expand Down Expand Up @@ -572,7 +573,7 @@ public void onPreface()
}
catch (HpackException x)
{
x.printStackTrace();
throw new RuntimeException(x);
}
}

Expand All @@ -589,7 +590,7 @@ public void onHeaders(HeadersFrame request)
}
catch (HpackException x)
{
x.printStackTrace();
throw new RuntimeException(x);
}
}

Expand All @@ -599,11 +600,11 @@ private void writeFrames()
{
// Write the frames.
accumulator.writeTo(Content.Sink.from(output), false);
accumulator.release();
accumulator.clear();
}
catch (Throwable x)
catch (IOException x)
{
x.printStackTrace();
throw new RuntimeException(x);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int ag

private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize)
{
super();
super(new ReferenceCounter()); // Make sure the wrapped retainable is a ReferenceCounter.
_pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool;
_maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize;
_buffers = buffers == null ? new ArrayList<>() : buffers;
Expand All @@ -1456,6 +1456,13 @@ private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized
throw new IllegalArgumentException("must always retain if cannot aggregate");
}

private void checkNotReleased()
{
ReferenceCounter counter = (ReferenceCounter)getWrapped();
if (counter.get() == 0)
throw new IllegalStateException("Already released");
}

public long getMaxSize()
{
return _maxSize;
Expand Down Expand Up @@ -1490,6 +1497,7 @@ public ByteBuffer getByteBuffer() throws BufferOverflowException
{
if (LOG.isDebugEnabled())
LOG.debug("getByteBuffer {}", this);
checkNotReleased();
return switch (_buffers.size())
{
case 0 -> BufferUtil.EMPTY_BUFFER;
Expand Down Expand Up @@ -1523,6 +1531,7 @@ public RetainableByteBuffer take(long length)
{
if (LOG.isDebugEnabled())
LOG.debug("take {} {}", this, length);
checkNotReleased();

if (_buffers.isEmpty() || length == 0)
return RetainableByteBuffer.EMPTY;
Expand Down Expand Up @@ -1588,6 +1597,7 @@ public RetainableByteBuffer takeFrom(long skip)
{
if (LOG.isDebugEnabled())
LOG.debug("take {} {}", this, skip);
checkNotReleased();

if (_buffers.isEmpty() || skip > size())
return RetainableByteBuffer.EMPTY;
Expand Down Expand Up @@ -1653,6 +1663,7 @@ public byte[] takeByteArray()
{
if (LOG.isDebugEnabled())
LOG.debug("takeByteArray {}", this);
checkNotReleased();
return switch (_buffers.size())
{
case 0 -> BufferUtil.EMPTY_BUFFER.array();
Expand Down Expand Up @@ -1698,6 +1709,7 @@ public byte get() throws BufferUnderflowException
{
if (LOG.isDebugEnabled())
LOG.debug("get {}", this);
checkNotReleased();
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
RetainableByteBuffer buffer = i.next();
Expand All @@ -1724,6 +1736,7 @@ public byte get(long index) throws IndexOutOfBoundsException
{
if (LOG.isDebugEnabled())
LOG.debug("get {} {}", this, index);
checkNotReleased();
for (RetainableByteBuffer buffer : _buffers)
{
long size = buffer.size();
Expand All @@ -1739,6 +1752,7 @@ public int get(byte[] bytes, int offset, int length)
{
if (LOG.isDebugEnabled())
LOG.debug("get array {} {}", this, length);
checkNotReleased();
int got = 0;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();)
{
Expand Down Expand Up @@ -1766,6 +1780,7 @@ public boolean isDirect()
@Override
public boolean hasRemaining()
{
checkNotReleased();
for (RetainableByteBuffer rbb : _buffers)
if (!rbb.isEmpty())
return true;
Expand All @@ -1777,6 +1792,7 @@ public long skip(long length)
{
if (LOG.isDebugEnabled())
LOG.debug("skip {} {}", this, length);
checkNotReleased();
long skipped = 0;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); length > 0 && i.hasNext();)
{
Expand All @@ -1799,6 +1815,7 @@ public void limit(long limit)
{
if (LOG.isDebugEnabled())
LOG.debug("limit {} {}", this, limit);
checkNotReleased();
for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
{
RetainableByteBuffer buffer = i.next();
Expand Down Expand Up @@ -1826,6 +1843,7 @@ public Mutable slice()
{
if (LOG.isDebugEnabled())
LOG.debug("slice {}", this);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.slice());
Expand All @@ -1837,6 +1855,7 @@ public Mutable slice(long length)
{
if (LOG.isDebugEnabled())
LOG.debug("slice {} {}", this, length);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (Iterator<RetainableByteBuffer> i = _buffers.iterator(); i.hasNext();)
{
Expand Down Expand Up @@ -1879,6 +1898,7 @@ public RetainableByteBuffer copy()
{
if (LOG.isDebugEnabled())
LOG.debug("copy {}", this);
checkNotReleased();
List<RetainableByteBuffer> buffers = new ArrayList<>(_buffers.size());
for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.copy());
Expand All @@ -1900,6 +1920,7 @@ public int remaining()
@Override
public long size()
{
checkNotReleased();
long length = 0;
for (RetainableByteBuffer buffer : _buffers)
length += buffer.remaining();
Expand Down Expand Up @@ -1928,6 +1949,7 @@ public boolean release()
{
if (LOG.isDebugEnabled())
LOG.debug("release {}", this);
checkNotReleased();
if (super.release())
{
for (RetainableByteBuffer buffer : _buffers)
Expand All @@ -1944,6 +1966,7 @@ public void clear()
{
if (LOG.isDebugEnabled())
LOG.debug("clear {}", this);
checkNotReleased();
if (_buffers.isEmpty())
return;
_aggregate = null;
Expand All @@ -1957,8 +1980,10 @@ public boolean append(ByteBuffer bytes)
{
if (LOG.isDebugEnabled())
LOG.debug("append BB {} <- {}", this, BufferUtil.toDetailString(bytes));
checkNotReleased();
// Cannot mutate contents if retained
assert !isRetained();
if (isRetained())
throw new IllegalStateException("Cannot append to a retained instance");

// handle empty appends
if (bytes == null)
Expand Down Expand Up @@ -2056,9 +2081,10 @@ public boolean append(RetainableByteBuffer retainableBytes)
{
if (LOG.isDebugEnabled())
LOG.debug("append RBB {} {}", this, retainableBytes);

checkNotReleased();
// Cannot mutate contents if retained
assert !isRetained();
if (isRetained())
throw new IllegalStateException("Cannot append to a retained instance");

// Optimize appending dynamics
if (retainableBytes instanceof DynamicCapacity dynamicCapacity)
Expand Down Expand Up @@ -2118,6 +2144,7 @@ public Mutable add(ByteBuffer bytes) throws ReadOnlyBufferException, BufferOverf
{
if (LOG.isDebugEnabled())
LOG.debug("add BB {} <- {}", this, BufferUtil.toDetailString(bytes));
checkNotReleased();
add(RetainableByteBuffer.wrap(bytes));
return this;
}
Expand All @@ -2127,6 +2154,7 @@ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, B
{
if (LOG.isDebugEnabled())
LOG.debug("add RBB {} <- {}", this, bytes);
checkNotReleased();
long size = size();
long space = _maxSize - size;
long length = bytes.size();
Expand All @@ -2147,13 +2175,15 @@ public Mutable add(RetainableByteBuffer bytes) throws ReadOnlyBufferException, B
@Override
public Mutable put(byte b)
{
checkNotReleased();
ensure(1).put(b);
return this;
}

@Override
public Mutable put(long index, byte b)
{
checkNotReleased();
for (RetainableByteBuffer buffer : _buffers)
{
long size = buffer.size();
Expand All @@ -2170,27 +2200,31 @@ public Mutable put(long index, byte b)
@Override
public Mutable putShort(short s)
{
checkNotReleased();
ensure(2).putShort(s);
return this;
}

@Override
public Mutable putInt(int i)
{
checkNotReleased();
ensure(4).putInt(i);
return this;
}

@Override
public Mutable putLong(long l)
{
checkNotReleased();
ensure(8).putLong(l);
return this;
}

@Override
public Mutable put(byte[] bytes, int offset, int length)
{
checkNotReleased();
// Use existing aggregate if the length is large and there is space for at least half
if (length >= 16 && _aggregate != null)
{
Expand Down Expand Up @@ -2252,6 +2286,7 @@ public boolean appendTo(ByteBuffer to)
{
if (LOG.isDebugEnabled())
LOG.debug("appendTo BB {} -> {}", this, BufferUtil.toDetailString(to));
checkNotReleased();
_aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
Expand All @@ -2269,6 +2304,7 @@ public boolean appendTo(RetainableByteBuffer to)
{
if (LOG.isDebugEnabled())
LOG.debug("appendTo RBB {} -> {}", this, to);
checkNotReleased();
_aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
Expand All @@ -2286,6 +2322,7 @@ public void putTo(ByteBuffer toInfillMode)
{
if (LOG.isDebugEnabled())
LOG.debug("putTo BB {} -> {}", this, toInfillMode);
checkNotReleased();
_aggregate = null;
for (Iterator<RetainableByteBuffer> i = _buffers.listIterator(); i.hasNext();)
{
Expand All @@ -2301,6 +2338,7 @@ public void writeTo(Content.Sink sink, boolean last, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("writeTo {} -> {} {} {}", this, sink, last, callback);
checkNotReleased();
_aggregate = null;
switch (_buffers.size())
{
Expand Down

0 comments on commit ceba33f

Please sign in to comment.