Skip to content

Commit

Permalink
CURATOR-710: Fix leaking watch in EnsembleTracker
Browse files Browse the repository at this point in the history
CURATOR-667(#474) fixes asynchronous event path for `getConfig` to
"/zookeeper/config" by using `CuratorFramework::usingNamespace(null)` to
fetch data.

It causes watcher not registering to possible `WatcherRemovalManager`,
so leaking in `WatcherRemoveCuratorFramework::removeWatchers`.
  • Loading branch information
kezhuw committed Oct 28, 2024
1 parent 07583f8 commit cd03c72
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,24 @@
public class GetConfigBuilderImpl
implements GetConfigBuilder, BackgroundOperation<Void>, ErrorListenerEnsembleable<byte[]> {
private final CuratorFrameworkImpl client;
private final WatcherRemovalManager watcherRemovalManager;

private Backgrounding backgrounding;
private Watching watching;
private Stat stat;

public GetConfigBuilderImpl(CuratorFrameworkImpl client) {
this.client = (CuratorFrameworkImpl) client.usingNamespace(null);
this.watcherRemovalManager = client.getWatcherRemovalManager();
backgrounding = new Backgrounding();
watching = new Watching(this.client);
watching = new Watching(this.client).setWatcherRemovalManager(watcherRemovalManager);
}

public GetConfigBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, Watcher watcher, Stat stat) {
this.client = (CuratorFrameworkImpl) client.usingNamespace(null);
this.watcherRemovalManager = client.getWatcherRemovalManager();
this.backgrounding = backgrounding;
this.watching = new Watching(this.client, watcher);
this.watching = new Watching(this.client, watcher).setWatcherRemovalManager(watcherRemovalManager);
this.stat = stat;
}

Expand Down Expand Up @@ -110,19 +113,19 @@ public BackgroundEnsembleable<byte[]> usingWatcher(CuratorWatcher watcher) {

@Override
public BackgroundEnsembleable<byte[]> watched() {
watching = new Watching(client, true);
watching = new Watching(client, true).setWatcherRemovalManager(watcherRemovalManager);
return new InternalBackgroundEnsembleable();
}

@Override
public BackgroundEnsembleable<byte[]> usingWatcher(Watcher watcher) {
watching = new Watching(client, watcher);
watching = new Watching(client, watcher).setWatcherRemovalManager(watcherRemovalManager);
return new InternalBackgroundEnsembleable();
}

@Override
public BackgroundEnsembleable<byte[]> usingWatcher(CuratorWatcher watcher) {
watching = new Watching(client, watcher);
watching = new Watching(client, watcher).setWatcherRemovalManager(watcherRemovalManager);
return new InternalBackgroundEnsembleable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,46 @@ public class Watching {
private final CuratorWatcher curatorWatcher;
private final boolean watched;
private final CuratorFrameworkImpl client;
private WatcherRemovalManager watcherRemovalManager;
private NamespaceWatcher namespaceWatcher;

public Watching(CuratorFrameworkImpl client, boolean watched) {
this.client = client;
this.watcherRemovalManager = client.getWatcherRemovalManager();
this.watcher = null;
this.curatorWatcher = null;
this.watched = watched;
}

public Watching(CuratorFrameworkImpl client, Watcher watcher) {
this.client = client;
this.watcherRemovalManager = client.getWatcherRemovalManager();
this.watcher = watcher;
this.curatorWatcher = null;
this.watched = false;
}

public Watching(CuratorFrameworkImpl client, CuratorWatcher watcher) {
this.client = client;
this.watcherRemovalManager = client.getWatcherRemovalManager();
this.watcher = null;
this.curatorWatcher = watcher;
this.watched = false;
}

public Watching(CuratorFrameworkImpl client) {
this.client = client;
this.watcherRemovalManager = client.getWatcherRemovalManager();
watcher = null;
watched = false;
curatorWatcher = null;
}

Watching setWatcherRemovalManager(WatcherRemovalManager watcherRemovalManager) {
this.watcherRemovalManager = watcherRemovalManager;
return this;
}

Watcher getWatcher(String unfixedPath) {
namespaceWatcher = null;
if (watcher != null) {
Expand Down Expand Up @@ -85,10 +95,8 @@ void commitWatcher(int rc, boolean isExists) {
doCommit = (rc == KeeperException.Code.OK.intValue());
}

if (doCommit && (namespaceWatcher != null)) {
if (client.getWatcherRemovalManager() != null) {
client.getWatcherRemovalManager().add(namespaceWatcher);
}
if (doCommit && namespaceWatcher != null && watcherRemovalManager != null) {
watcherRemovalManager.add(namespaceWatcher);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand All @@ -34,12 +35,27 @@
import org.apache.curator.test.Timing;
import org.apache.curator.test.WatchersDebug;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.utils.DebugUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestWatcherRemovalManager extends CuratorTestBase {
private static final String superUserPasswordDigest = "curator-test:zghsj3JfJqK7DbWf0RQ1BgbJH9w="; // ran from
private static final String superUserPassword = "curator-test";

@BeforeEach
@Override
public void setup() throws Exception {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", superUserPasswordDigest);
super.setup();
}

@Test
public void testSameWatcherDifferentPaths1Triggered() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
Expand Down Expand Up @@ -302,6 +318,54 @@ public void testBasicNamespace3() throws Exception {
}
}

@Test
public void testEnsembleTracker() throws Exception {
// given: client with ensemble tracker
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.namespace("hey")
.ensembleTracker(true)
.authorization("digest", superUserPassword.getBytes())
.build();
try {
client.start();

// We are using standalone, so "/zookeeper/config" will be empty.
// So let's set it directly.
QuorumMaj quorumMaj = new QuorumMaj(Collections.singletonMap(
1L,
new QuorumPeer.QuorumServer(1, "127.0.0.1:2182:2183:participant;" + server.getConnectString())));
quorumMaj.setVersion(1);
client.usingNamespace(null)
.setData()
.forPath(ZooDefs.CONFIG_NODE, quorumMaj.toString().getBytes());

// when: zookeeper config node data fetched
while (client.getCurrentConfig().getVersion() == 0) {
Thread.sleep(100);
}

// then: the watcher must be attached
assertEquals(
1,
WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper())
.size());

// when: ensemble tracker closed
System.setProperty(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
((CuratorFrameworkImpl) client).getEnsembleTracker().close();

// then: the watcher must be removed
assertEquals(
0,
WatchersDebug.getDataWatches(client.getZookeeperClient().getZooKeeper())
.size());
} finally {
TestCleanState.closeAndTestClean(client);
}
}

@Test
public void testSameWatcher() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
Expand Down

0 comments on commit cd03c72

Please sign in to comment.