Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,37 +45,45 @@ public class DefaultSecretKeyVerifierClient implements SecretKeyVerifierClient {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultSecretKeyVerifierClient.class);

private final LoadingCache<UUID, ManagedSecretKey> cache;
private final LoadingCache<UUID, Optional<ManagedSecretKey>> cache;

DefaultSecretKeyVerifierClient(SCMSecurityProtocol scmSecurityProtocol,
ConfigurationSource conf) {
Duration expiryDuration = parseExpiryDuration(conf);
Duration rotateDuration = parseRotateDuration(conf);
long cacheSize = expiryDuration.toMillis() / rotateDuration.toMillis() + 1;

CacheLoader<UUID, ManagedSecretKey> loader =
new CacheLoader<UUID, ManagedSecretKey>() {
// if rotation is 1d, and each keys is valid for 7d before expiring,
// the expected number valid keys at any time is 7.
long expectedValidKeys =
expiryDuration.toMillis() / rotateDuration.toMillis() + 1;
// However, we want to cache some expired keys as well, to avoid requesting
// SCM for recently expire secret keys.
long cacheSize = expectedValidKeys * 2;
Duration cacheExpiry = expiryDuration.multipliedBy(2);

CacheLoader<UUID, Optional<ManagedSecretKey>> loader =
new CacheLoader<UUID, Optional<ManagedSecretKey>>() {
@Override
public ManagedSecretKey load(UUID id) throws Exception {
public Optional<ManagedSecretKey> load(UUID id) throws Exception {
ManagedSecretKey secretKey = scmSecurityProtocol.getSecretKey(id);
LOG.info("Secret key fetched from SCM: {}", secretKey);
return secretKey;
return Optional.ofNullable(secretKey);
}
};

LOG.info("Initializing secret key cache with size {}, TTL {}",
cacheSize, expiryDuration);
cache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(expiryDuration.toMillis(), TimeUnit.MILLISECONDS)
.expireAfterWrite(cacheExpiry.toMillis(), TimeUnit.MILLISECONDS)
.recordStats()
.build(loader);
}

@Override
public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
try {
return cache.get(id);
return cache.get(id).orElse(null);
} catch (ExecutionException e) {
// handle cache load exception.
if (e.getCause() instanceof IOException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.hadoop.hdds.security.exception.SCMSecurityException;

import javax.annotation.Nullable;
import java.util.UUID;

/**
* Define the client-side API that the token verifiers (or datanodes) use to
* retrieve the relevant secret key to validate token authority.
*/
public interface SecretKeyVerifierClient {
@Nullable
ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private void verifyTokenPassword(
ManagedSecretKey secretKey = secretKeyClient.getSecretKey(
tokenId.getSecretKeyId());
if (secretKey == null) {
throw new BlockTokenException("Can't find the signer secret key " +
throw new BlockTokenException("Can't find the signing secret key " +
tokenId.getSecretKeyId() + " of the token for user: " +
tokenId.getUser());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void rejectsTokenWithInvalidSecretId() throws Exception {
BlockTokenException ex = assertThrows(BlockTokenException.class, () ->
subject.verify("anyUser", token, cmd));
assertThat(ex.getMessage(),
containsString("Can't find the signer secret key"));
containsString("Can't find the signing secret key"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ private static List<BlockExtendedInputStream> createStreams(
// BlockInputStream is only created here and not initialized. The
// BlockInputStream is initialized when a read operation is performed on
// the block for the first time.
Function<BlockID, BlockLocationInfo> retry;
if (retryFunction != null) {
retry = keyBlockID -> {
OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo);
return getBlockLocationInfo(newKeyInfo,
omKeyLocationInfo.getBlockID());
};
} else {
retry = null;
}

BlockExtendedInputStream stream =
blockStreamFactory.create(
keyInfo.getReplicationConfig(),
Expand All @@ -82,11 +93,7 @@ private static List<BlockExtendedInputStream> createStreams(
omKeyLocationInfo.getToken(),
verifyChecksum,
xceiverClientFactory,
keyBlockID -> {
OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo);
return getBlockLocationInfo(newKeyInfo,
omKeyLocationInfo.getBlockID());
});
retry);
partStreams.add(stream);
}
return partStreams;
Expand Down
Loading