Skip to content

Commit

Permalink
backport tracking pool from jetty 12
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban committed Jul 15, 2024
1 parent aa5d3d0 commit 807bc74
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,42 @@ protected void removed(RetainableByteBuffer retainedBuffer)
ArrayByteBufferPool.this.release(retainedBuffer.getBuffer());
}
}

/**
* <p>A variant of {@link ArrayByteBufferPool} that tracks buffer
* acquires/releases of the retained buffers, useful to identify buffer leaks.</p>
* @see ArrayRetainableByteBufferPool.Tracking
*/
public static class Tracking extends ArrayByteBufferPool
{
public Tracking()
{
}

public Tracking(int minCapacity, int factor, int maxCapacity)
{
super(minCapacity, factor, maxCapacity);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{
super(minCapacity, factor, maxCapacity, maxQueueLength);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}

@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new ArrayRetainableByteBufferPool.Tracking(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
package org.eclipse.jetty.io;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.NanoTime;
Expand Down Expand Up @@ -486,4 +494,161 @@ public String toString()
entries > 0 ? (inUse * 100) / entries : 0);
}
}

/**
* <p>A variant of {@link ArrayRetainableByteBufferPool} that tracks buffer
* acquires/releases, useful to identify buffer leaks.</p>
* <p>Use {@link #getLeaks()} when the system is idle to get
* the {@link Buffer}s that have been leaked, which contain
* the stack trace information of where the buffer was acquired.</p>
*/
public static class Tracking extends ArrayRetainableByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(Tracking.class);

private final Set<Buffer> buffers = ConcurrentHashMap.newKeySet();

public Tracking()
{
super();
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{
super(minCapacity, factor, maxCapacity, maxBucketSize);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, bucketIndexFor, bucketCapacity, maxHeapMemory, maxDirectMemory);
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
RetainableByteBuffer buffer = super.acquire(size, direct);
Buffer wrapper = new Buffer(buffer, size);
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", wrapper);
buffers.add(wrapper);
return wrapper;
}

public Set<Buffer> getLeaks()
{
return buffers;
}

public String dumpLeaks()
{
return getLeaks().stream()
.map(Buffer::dump)
.collect(Collectors.joining(System.lineSeparator()));
}

public class Buffer extends RetainableByteBuffer
{
private final RetainableByteBuffer wrapped;
private final int size;
private final Instant acquireInstant;
private final Throwable acquireStack;
private final List<Throwable> retainStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> releaseStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> overReleaseStacks = new CopyOnWriteArrayList<>();

private Buffer(RetainableByteBuffer wrapped, int size)
{
super(wrapped.getBuffer(), x -> {});
this.wrapped = wrapped;
this.size = size;
this.acquireInstant = Instant.now();
this.acquireStack = new Throwable();
}

public int getSize()
{
return size;
}

public Instant getAcquireInstant()
{
return acquireInstant;
}

public Throwable getAcquireStack()
{
return acquireStack;
}

@Override
protected void acquire()
{
wrapped.acquire();
}

@Override
public boolean isRetained()
{
return wrapped.isRetained();
}

@Override
public void retain()
{
wrapped.retain();
retainStacks.add(new Throwable());
}

@Override
public boolean release()
{
try
{
boolean released = wrapped.release();
if (released)
{
buffers.remove(this);
if (LOG.isDebugEnabled())
LOG.debug("released {}", this);
}
releaseStacks.add(new Throwable());
return released;
}
catch (IllegalStateException e)
{
buffers.add(this);
overReleaseStacks.add(new Throwable());
throw e;
}
}

public String dump()
{
StringWriter w = new StringWriter();
PrintWriter pw = new PrintWriter(w);
getAcquireStack().printStackTrace(pw);
pw.println("\n" + retainStacks.size() + " retain(s)");
for (Throwable retainStack : retainStacks)
{
retainStack.printStackTrace(pw);
}
pw.println("\n" + releaseStacks.size() + " release(s)");
for (Throwable releaseStack : releaseStacks)
{
releaseStack.printStackTrace(pw);
}
pw.println("\n" + overReleaseStacks.size() + " over-release(s)");
for (Throwable overReleaseStack : overReleaseStacks)
{
overReleaseStack.printStackTrace(pw);
}
return String.format("%s@%x of %d bytes on %s wrapping %s acquired at %s", getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), wrapped, w);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public boolean isDirect()
* The reason why this method exists on top of {@link #retain()} is to be able to
* have some safety checks that must know why the ref counter is being incremented.
*/
void acquire()
protected void acquire()
{
if (references.getAndUpdate(c -> c == 0 ? 1 : c) != 0)
throw new IllegalStateException("re-pooled while still used " + this);
Expand Down

0 comments on commit 807bc74

Please sign in to comment.