Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 10: backport the tracking retainable pool from 12 #12041

Merged
merged 18 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,42 @@ protected void removed(RetainableByteBuffer retainedBuffer)
ArrayByteBufferPool.this.release(retainedBuffer.getBuffer());
}
}

/**
* <p>A variant of {@link ArrayByteBufferPool} that tracks buffer
* acquires/releases of the retained buffers, useful to identify buffer leaks.</p>
* @see ArrayRetainableByteBufferPool.Tracking
*/
public static class Tracking extends ArrayByteBufferPool
{
public Tracking()
{
}

public Tracking(int minCapacity, int factor, int maxCapacity)
{
super(minCapacity, factor, maxCapacity);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{
super(minCapacity, factor, maxCapacity, maxQueueLength);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}

@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new ArrayRetainableByteBufferPool.Tracking(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
package org.eclipse.jetty.io;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.NanoTime;
Expand Down Expand Up @@ -486,4 +494,161 @@ public String toString()
entries > 0 ? (inUse * 100) / entries : 0);
}
}

/**
* <p>A variant of {@link ArrayRetainableByteBufferPool} that tracks buffer
* acquires/releases, useful to identify buffer leaks.</p>
* <p>Use {@link #getLeaks()} when the system is idle to get
* the {@link Buffer}s that have been leaked, which contain
* the stack trace information of where the buffer was acquired.</p>
*/
public static class Tracking extends ArrayRetainableByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(Tracking.class);

private final Set<Buffer> buffers = ConcurrentHashMap.newKeySet();

public Tracking()
{
super();
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{
super(minCapacity, factor, maxCapacity, maxBucketSize);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}

public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, factor, maxCapacity, maxBucketSize, bucketIndexFor, bucketCapacity, maxHeapMemory, maxDirectMemory);
}

@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
RetainableByteBuffer buffer = super.acquire(size, direct);
Buffer wrapper = new Buffer(buffer, size);
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", wrapper);
buffers.add(wrapper);
return wrapper;
}

public Set<Buffer> getLeaks()
{
return buffers;
}

public String dumpLeaks()
{
return getLeaks().stream()
.map(Buffer::dump)
.collect(Collectors.joining(System.lineSeparator()));
}

public class Buffer extends RetainableByteBuffer
{
private final RetainableByteBuffer wrapped;
private final int size;
private final Instant acquireInstant;
private final Throwable acquireStack;
private final List<Throwable> retainStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> releaseStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> overReleaseStacks = new CopyOnWriteArrayList<>();

private Buffer(RetainableByteBuffer wrapped, int size)
{
super(wrapped.getBuffer(), x -> {});
this.wrapped = wrapped;
this.size = size;
this.acquireInstant = Instant.now();
this.acquireStack = new Throwable();
}

public int getSize()
{
return size;
}

public Instant getAcquireInstant()
{
return acquireInstant;
}

public Throwable getAcquireStack()
{
return acquireStack;
}

@Override
protected void acquire()
{
wrapped.acquire();
}

@Override
public boolean isRetained()
{
return wrapped.isRetained();
}

@Override
public void retain()
{
wrapped.retain();
retainStacks.add(new Throwable());
}

@Override
public boolean release()
{
try
{
boolean released = wrapped.release();
if (released)
{
buffers.remove(this);
if (LOG.isDebugEnabled())
LOG.debug("released {}", this);
}
releaseStacks.add(new Throwable());
return released;
}
catch (IllegalStateException e)
{
buffers.add(this);
overReleaseStacks.add(new Throwable());
throw e;
}
}

public String dump()
{
StringWriter w = new StringWriter();
PrintWriter pw = new PrintWriter(w);
getAcquireStack().printStackTrace(pw);
pw.println("\n" + retainStacks.size() + " retain(s)");
for (Throwable retainStack : retainStacks)
{
retainStack.printStackTrace(pw);
}
pw.println("\n" + releaseStacks.size() + " release(s)");
for (Throwable releaseStack : releaseStacks)
{
releaseStack.printStackTrace(pw);
}
pw.println("\n" + overReleaseStacks.size() + " over-release(s)");
for (Throwable overReleaseStack : overReleaseStacks)
{
overReleaseStack.printStackTrace(pw);
}
return String.format("%s@%x of %d bytes on %s wrapping %s acquired at %s", getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), wrapped, w);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQ
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param retainedHeapMemory the max heap memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public boolean isDirect()
* The reason why this method exists on top of {@link #retain()} is to be able to
* have some safety checks that must know why the ref counter is being incremented.
*/
void acquire()
protected void acquire()
{
if (references.getAndUpdate(c -> c == 0 ? 1 : c) != 0)
throw new IllegalStateException("re-pooled while still used " + this);
Expand Down
Loading