Skip to content

Commit

Permalink
RATIS-2164. LeakDetector has a race condition. (#1163)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored Oct 16, 2024
1 parent 62ae6d9 commit a15bde1
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 102 deletions.
122 changes: 90 additions & 32 deletions ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to
Expand Down Expand Up @@ -55,13 +59,61 @@
*/
public class LeakDetector {
private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class);

private static class LeakTrackerSet {
private final Set<LeakTracker> set = Collections.newSetFromMap(new HashMap<>());

synchronized boolean remove(LeakTracker tracker) {
return set.remove(tracker);
}

synchronized void removeExisting(LeakTracker tracker) {
final boolean removed = set.remove(tracker);
Preconditions.assertTrue(removed, () -> "Failed to remove existing " + tracker);
}

synchronized LeakTracker add(Object referent, ReferenceQueue<Object> queue, Supplier<String> leakReporter) {
final LeakTracker tracker = new LeakTracker(referent, queue, this::removeExisting, leakReporter);
final boolean added = set.add(tracker);
Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for " + referent);
return tracker;
}

synchronized int getNumLeaks(boolean throwException) {
if (set.isEmpty()) {
return 0;
}

int n = 0;
for (LeakTracker tracker : set) {
if (tracker.reportLeak() != null) {
n++;
}
}
if (throwException) {
assertNoLeaks(n);
}
return n;
}

synchronized void assertNoLeaks(int leaks) {
Preconditions.assertTrue(leaks == 0, () -> {
final int size = set.size();
return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" : "!=") + " set.size = " + size;
});
}
}

private static final AtomicLong COUNTER = new AtomicLong();

private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>());
/** All the {@link LeakTracker}s. */
private final LeakTrackerSet trackers = new LeakTrackerSet();
/** When a leak is discovered, a message is printed and added to this list. */
private final List<String> leakMessages = Collections.synchronizedList(new ArrayList<>());
private final String name;

public LeakDetector(String name) {
LeakDetector(String name) {
this.name = name + COUNTER.getAndIncrement();
}

Expand All @@ -80,8 +132,11 @@ private void run() {
LeakTracker tracker = (LeakTracker) queue.remove();
// Original resource already been GCed, if tracker is not closed yet,
// report a leak.
if (allLeaks.remove(tracker)) {
tracker.reportLeak();
if (trackers.remove(tracker)) {
final String leak = tracker.reportLeak();
if (leak != null) {
leakMessages.add(leak);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -93,48 +148,51 @@ private void run() {
LOG.warn("Exiting leak detector {}.", name);
}

public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
// A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
Runnable track(Object leakable, Supplier<String> reportLeak) {
// TODO: A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%,
// if we have proofs that leak tracking impacts performance, or a single LeakDetector
// thread can't keep up with the pace of object allocation.
// For now, it looks effective enough and let keep it simple.
LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak);
allLeaks.add(tracker);
return tracker;
return trackers.add(leakable, queue, reportLeak)::remove;
}

public void assertNoLeaks() {
Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
}
public void assertNoLeaks(int maxRetries, TimeDuration retrySleep) throws InterruptedException {
synchronized (leakMessages) {
// leakMessages are all the leaks discovered so far.
Preconditions.assertTrue(leakMessages.isEmpty(),
() -> "#leaks = " + leakMessages.size() + "\n" + leakMessages);
}

String allLeaksString() {
if (allLeaks.isEmpty()) {
return "allLeaks = <empty>";
for(int i = 0; i < maxRetries; i++) {
final int numLeaks = trackers.getNumLeaks(false);
if (numLeaks == 0) {
return;
}
LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i, maxRetries, numLeaks);
retrySleep.sleep();
}
allLeaks.forEach(LeakTracker::reportLeak);
return "allLeaks.size = " + allLeaks.size();
trackers.getNumLeaks(true);
}

private static final class LeakTracker extends WeakReference<Object> implements UncheckedAutoCloseable {
private final Set<LeakTracker> allLeaks;
private final Runnable leakReporter;
private static final class LeakTracker extends WeakReference<Object> {
private final Consumer<LeakTracker> removeMethod;
private final Supplier<String> getLeakMessage;

LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
Set<LeakTracker> allLeaks, Runnable leakReporter) {
Consumer<LeakTracker> removeMethod, Supplier<String> getLeakMessage) {
super(referent, referenceQueue);
this.allLeaks = allLeaks;
this.leakReporter = leakReporter;
this.removeMethod = removeMethod;
this.getLeakMessage = getLeakMessage;
}

/**
* Called by the tracked resource when closing.
*/
@Override
public void close() {
allLeaks.remove(this);
/** Called by the tracked resource when the object is completely released. */
void remove() {
removeMethod.accept(this);
}

void reportLeak() {
leakReporter.run();
/** @return the leak message if there is a leak; return null if there is no leak. */
String reportLeak() {
return getLeakMessage.get();
}
}
}
Loading

0 comments on commit a15bde1

Please sign in to comment.