Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import javax.security.auth.login.AppConfigurationEntry;

Expand All @@ -40,10 +40,9 @@
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheBridge;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.retry.RetryNTimes;
Expand Down Expand Up @@ -113,7 +112,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
// by default it is still incrementing seq number by 1 each time
public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1;

private static Logger LOG = LoggerFactory
private static final Logger LOG = LoggerFactory
.getLogger(ZKDelegationTokenSecretManager.class);

private static final String JAAS_LOGIN_ENTRY_NAME =
Expand All @@ -139,10 +138,8 @@ public static void setCurator(CuratorFramework curator) {
protected final CuratorFramework zkClient;
private SharedCount delTokSeqCounter;
private SharedCount keyIdSeqCounter;
private PathChildrenCache keyCache;
private PathChildrenCache tokenCache;
private ExecutorService listenerThreadPool;
private final long shutdownTimeout;
private CuratorCacheBridge keyCache;
private CuratorCacheBridge tokenCache;
private final int seqNumBatchSize;
private int currentSeqNum;
private int currentMaxSeqNum;
Expand All @@ -158,8 +155,6 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
Expand Down Expand Up @@ -333,7 +328,6 @@ public void startThreads() throws IOException {
throw new IOException("Could not create namespace", e);
}
}
listenerThreadPool = Executors.newSingleThreadExecutor();
try {
delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
if (delTokSeqCounter != null) {
Expand Down Expand Up @@ -363,105 +357,122 @@ public void startThreads() throws IOException {
throw new RuntimeException("Could not create ZK paths");
}
try {
keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT)
.build();
if (keyCache != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyCache will never be null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this was not applicable even before with PathChildrenCache.

keyCache.start(StartMode.BUILD_INITIAL_CACHE);
keyCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processKeyAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processKeyRemoved(event.getData().getPath());
break;
default:
break;
}
}
}, listenerThreadPool);
CuratorCacheListener keyCacheListener = CuratorCacheListener.builder()
.forCreates(childData -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: there is a forCreatesAndChanges()

try {
processKeyAddOrUpdate(childData.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator keyCacheListener "
+ "NODE_CREATED event");
throw new UncheckedIOException(e);
}
})
.forChanges((oldNode, node) -> {
try {
processKeyAddOrUpdate(node.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator keyCacheListener "
+ "NODE_CHANGED event");
throw new UncheckedIOException(e);
}
})
.forDeletes(childData -> processKeyRemoved(childData.getPath()))
.build();
keyCache.listenable().addListener(keyCacheListener);
keyCache.start();
loadFromZKCache(false);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for keys", e);
throw new IOException("Could not start Curator keyCacheListener for keys",
e);
}
if (isTokenWatcherEnabled) {
LOG.info("TokenCache is enabled");
try {
tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT)
.build();
if (tokenCache != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokenCache will never be null

tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
tokenCache.getListenable().addListener(new PathChildrenCacheListener() {

@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_UPDATED:
processTokenAddOrUpdate(event.getData().getData());
break;
case CHILD_REMOVED:
processTokenRemoved(event.getData());
break;
default:
break;
}
}
}, listenerThreadPool);
CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder()
.forCreates(childData -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: there is a forCreatesAndChanges()

try {
processTokenAddOrUpdate(childData.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator tokenCacheListener "
+ "NODE_CREATED event");
throw new UncheckedIOException(e);
}
})
.forChanges((oldNode, node) -> {
try {
processTokenAddOrUpdate(node.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator tokenCacheListener "
+ "NODE_CHANGED event");
throw new UncheckedIOException(e);
}
})
.forDeletes(childData -> {
try {
processTokenRemoved(childData);
} catch (IOException e) {
LOG.error("Error while processing Curator tokenCacheListener "
+ "NODE_DELETED event");
throw new UncheckedIOException(e);
}
})
.build();
tokenCache.listenable().addListener(tokenCacheListener);
tokenCache.start();
loadFromZKCache(true);
}
} catch (Exception e) {
throw new IOException("Could not start PathChildrenCache for tokens", e);
throw new IOException(
"Could not start Curator tokenCacheListener for tokens", e);
}
}
super.startThreads();
}

/**
* Load the PathChildrenCache into the in-memory map. Possible caches to be
* Load the CuratorCache into the in-memory map. Possible caches to be
* loaded are keyCache and tokenCache.
*
* @param isTokenCache true if loading tokenCache, false if loading keyCache.
*/
private void loadFromZKCache(final boolean isTokenCache) {
final String cacheName = isTokenCache ? "token" : "key";
LOG.info("Starting to load {} cache.", cacheName);
final List<ChildData> children;
final Stream<ChildData> children;
if (isTokenCache) {
children = tokenCache.getCurrentData();
children = tokenCache.stream();
} else {
children = keyCache.getCurrentData();
children = keyCache.stream();
}

int count = 0;
for (ChildData child : children) {
final AtomicInteger count = new AtomicInteger(0);
children.forEach(childData -> {
try {
if (isTokenCache) {
processTokenAddOrUpdate(child.getData());
processTokenAddOrUpdate(childData.getData());
} else {
processKeyAddOrUpdate(child.getData());
processKeyAddOrUpdate(childData.getData());
}
} catch (Exception e) {
LOG.info("Ignoring node {} because it failed to load.",
child.getPath());
childData.getPath());
LOG.debug("Failure exception:", e);
++count;
count.getAndIncrement();
}
}
});
if (isTokenCache) {
syncTokenOwnerStats();
}
if (count > 0) {
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName);
if (count.get() > 0) {
LOG.warn("Ignored {} nodes while loading {} cache.", count.get(),
cacheName);
}
LOG.info("Loaded {} cache.", cacheName);
}
Expand Down Expand Up @@ -550,20 +561,6 @@ public void stopThreads() {
} catch (Exception e) {
LOG.error("Could not stop Curator Framework", e);
}
if (listenerThreadPool != null) {
listenerThreadPool.shutdown();
try {
// wait for existing tasks to terminate
if (!listenerThreadPool.awaitTermination(shutdownTimeout,
TimeUnit.MILLISECONDS)) {
LOG.error("Forcing Listener threadPool to shutdown !!");
listenerThreadPool.shutdownNow();
}
} catch (InterruptedException ie) {
listenerThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

private void createPersistentNode(String nodePath) throws Exception {
Expand Down Expand Up @@ -992,11 +989,6 @@ static String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}

@VisibleForTesting
public ExecutorService getListenerThreadPool() {
return listenerThreadPool;
}

@VisibleForTesting
DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
return currentTokens.get(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -318,19 +316,13 @@ public void testCancelTokenSingleManager() throws Exception {
@SuppressWarnings("rawtypes")
protected void verifyDestroy(DelegationTokenManager tm, Configuration conf)
throws Exception {
AbstractDelegationTokenSecretManager sm =
tm.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm;
ExecutorService es = zksm.getListenerThreadPool();
tm.destroy();
Assert.assertTrue(es.isShutdown());
// wait for the pool to terminate
long timeout =
conf.getLong(
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
Thread.sleep(timeout * 3);
Assert.assertTrue(es.isTerminated());
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -357,17 +349,6 @@ public void testStopThreads() throws Exception {
(Token<DelegationTokenIdentifier>)
tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
Assert.assertNotNull(token);

AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager();
ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm;
ExecutorService es = zksm.getListenerThreadPool();
es.submit(new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow
return null;
}
});

tm1.destroy();
}

Expand Down
Loading