diff --git a/dev-support/test-patch.sh b/dev-support/test-patch.sh
index cbeb81987e70d..3905c5a63b4de 100755
--- a/dev-support/test-patch.sh
+++ b/dev-support/test-patch.sh
@@ -454,7 +454,7 @@ checkJavadocWarnings () {
JIRA_COMMENT="$JIRA_COMMENT
{color:red}-1 javadoc{color}. The javadoc tool appears to have generated `expr $(($numPatchJavadocWarnings-$numTrunkJavadocWarnings))` warning messages.
- See $BUILD_URL/artifact/trunk/patchprocess/diffJavadocWarnings.txt for details."
+ See $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavadocWarnings.txt for details."
return 1
fi
fi
@@ -498,7 +498,7 @@ checkJavacWarnings () {
{color:red}-1 javac{color}. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)."
$DIFF $PATCH_DIR/filteredTrunkJavacWarnings.txt $PATCH_DIR/filteredPatchJavacWarnings.txt > $PATCH_DIR/diffJavacWarnings.txt
- JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/trunk/patchprocess/diffJavacWarnings.txt
+ JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavacWarnings.txt
$JIRA_COMMENT_FOOTER"
return 1
@@ -540,7 +540,7 @@ checkReleaseAuditWarnings () {
{color:red}-1 release audit{color}. The applied patch generated $patchReleaseAuditWarnings release audit warnings."
$GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt
echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." >> $PATCH_DIR/patchReleaseAuditProblems.txt
- JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/trunk/patchprocess/patchReleaseAuditProblems.txt
+ JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/patchReleaseAuditProblems.txt
$JIRA_COMMENT_FOOTER"
return 1
fi
@@ -659,7 +659,7 @@ checkFindbugsWarnings () {
$PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.xml \
$PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.html
if [[ $newFindbugsWarnings > 0 ]] ; then
- JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/trunk/patchprocess/newPatchFindbugsWarnings${module_suffix}.html
+ JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/newPatchFindbugsWarnings${module_suffix}.html
$JIRA_COMMENT_FOOTER"
fi
done
diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
index a17b6d495dc58..6c0fbbb0a26ee 100644
--- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
+++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
@@ -197,6 +197,8 @@ public void init(Properties config, ServletContext servletContext,
client = (CuratorFramework) curatorClientObj;
} else {
client = createCuratorClient(config);
+ servletContext.setAttribute(
+ ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE, client);
}
this.tokenValidity = tokenValidity;
shouldDisconnect = Boolean.parseBoolean(
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index f0fcab5580f51..e99a19d9e3b70 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -530,6 +530,16 @@ Release 2.6.0 - UNRELEASED
HADOOP-10922. User documentation for CredentialShell. (Larry McCay via wang)
+ HADOOP-11016. KMS should support signing cookies with zookeeper secret
+ manager. (tucu)
+
+ HADOOP-11106. Document considerations of HAR and Encryption. (clamb via wang)
+
+ HADOOP-10970. Cleanup KMS configuration keys. (wang)
+
+ HADOOP-11017. KMS delegation token secret manager should be able to use
+ zookeeper as store. (asuresh via tucu)
+
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@@ -584,6 +594,10 @@ Release 2.6.0 - UNRELEASED
HADOOP-10833. Remove unused cache in UserProvider. (Benoy Antony)
+ HADOOP-11112. TestKMSWithZK does not use KEY_PROVIDER_URI. (tucu via wang)
+
+ HADOOP-11111 MiniKDC to use locale EN_US for case conversions. (stevel)
+
BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry
@@ -721,8 +735,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11056. OsSecureRandom.setConf() might leak file descriptors (yzhang
via cmccabe)
- HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
- (cmccabe)
+ HADOOP-11040. Return value of read(ByteBuffer buf) in CryptoInputStream is
+ incorrect in some cases. (Yi Liu via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
@@ -824,6 +838,21 @@ Release 2.6.0 - UNRELEASED
HADOOP-11062. CryptoCodec testcases requiring OpenSSL should be run
only if -Pnative is used. (asuresh via tucu)
+ HADOOP-11099. KMS return HTTP UNAUTHORIZED 401 on ACL failure. (tucu)
+
+ HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks.
+ (Chuan Liu via cnauroth)
+
+ HADOOP-10982. KMS: Support for multiple Kerberos principals. (tucu)
+
+ HADOOP-11109. Site build is broken. (Jian He via atm)
+
+ HADOOP-10946. Fix a bunch of typos in log messages (Ray Chiang via aw)
+
+ HADOOP-10131. NetWorkTopology#countNumOfAvailableNodes() is returning
+ wrong value if excluded nodes passed are not part of the cluster tree
+ (vinayakumarb)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 0183e292c8a5d..32e95258a1068 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -218,6 +218,19 @@
com.jcraft
jsch
+
+ org.apache.curator
+ curator-test
+ test
+
+
+ org.apache.curator
+ curator-client
+
+
+ org.apache.curator
+ curator-recipes
+
com.google.code.findbugs
jsr305
diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
index 754601a19db75..202098d9774fe 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
+++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
@@ -328,6 +328,15 @@ esac
#
# export HADOOP_BALANCER_OPTS=""
+###
+# HDFS Mover specific parameters
+###
+# Specify the JVM options to be used when starting the HDFS Mover.
+# These options will be appended to the options specified as HADOOP_OPTS
+# and therefore may override any similar flags set in HADOOP_OPTS
+#
+# export HADOOP_MOVER_OPTS=""
+
###
# Advanced Users Only!
###
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index e8964ed6ed5b3..68e969737c5e2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -471,7 +471,16 @@ public int read(ByteBuffer buf) throws IOException {
streamOffset += n; // Read n bytes
decrypt(buf, n, pos);
}
- return n;
+
+ if (n >= 0) {
+ return unread + n;
+ } else {
+ if (unread == 0) {
+ return -1;
+ } else {
+ return unread;
+ }
+ }
}
throw new UnsupportedOperationException("ByteBuffer read unsupported " +
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
index 6ca0425b5517d..ce99d795f35ce 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java
@@ -46,7 +46,8 @@ public abstract KeyProvider createProvider(URI providerName,
) throws IOException;
private static final ServiceLoader serviceLoader =
- ServiceLoader.load(KeyProviderFactory.class);
+ ServiceLoader.load(KeyProviderFactory.class,
+ KeyProviderFactory.class.getClassLoader());
// Iterate through the serviceLoader to avoid lazy loading.
// Lazy loading would require synchronization in concurrent use cases.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 899b6c44dc795..a97463ac88156 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -45,6 +45,7 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URI;
@@ -400,6 +401,8 @@ public HttpURLConnection run() throws Exception {
});
} catch (IOException ex) {
throw ex;
+ } catch (UndeclaredThrowableException ex) {
+ throw new IOException(ex.getUndeclaredThrowable());
} catch (Exception ex) {
throw new IOException(ex);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java
index f960233fb7825..641709d98a36e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileEncryptionInfo.java
@@ -34,6 +34,7 @@ public class FileEncryptionInfo {
private final CipherSuite cipherSuite;
private final byte[] edek;
private final byte[] iv;
+ private final String keyName;
private final String ezKeyVersionName;
/**
@@ -42,14 +43,16 @@ public class FileEncryptionInfo {
* @param suite CipherSuite used to encrypt the file
* @param edek encrypted data encryption key (EDEK) of the file
* @param iv initialization vector (IV) used to encrypt the file
+ * @param keyName name of the key used for the encryption zone
* @param ezKeyVersionName name of the KeyVersion used to encrypt the
* encrypted data encryption key.
*/
public FileEncryptionInfo(final CipherSuite suite, final byte[] edek,
- final byte[] iv, final String ezKeyVersionName) {
+ final byte[] iv, final String keyName, final String ezKeyVersionName) {
checkNotNull(suite);
checkNotNull(edek);
checkNotNull(iv);
+ checkNotNull(keyName);
checkNotNull(ezKeyVersionName);
checkArgument(edek.length == suite.getAlgorithmBlockSize(),
"Unexpected key length");
@@ -58,6 +61,7 @@ public FileEncryptionInfo(final CipherSuite suite, final byte[] edek,
this.cipherSuite = suite;
this.edek = edek;
this.iv = iv;
+ this.keyName = keyName;
this.ezKeyVersionName = ezKeyVersionName;
}
@@ -83,6 +87,11 @@ public byte[] getIV() {
return iv;
}
+ /**
+ * @return name of the encryption zone key.
+ */
+ public String getKeyName() { return keyName; }
+
/**
* @return name of the encryption zone KeyVersion used to encrypt the
* encrypted data encryption key (EDEK).
@@ -95,6 +104,7 @@ public String toString() {
builder.append("cipherSuite: " + cipherSuite);
builder.append(", edek: " + Hex.encodeHexString(edek));
builder.append(", iv: " + Hex.encodeHexString(iv));
+ builder.append(", keyName: " + keyName);
builder.append(", ezKeyVersionName: " + ezKeyVersionName);
builder.append("}");
return builder.toString();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
index b261f7fdedf4b..da3807d307355 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileStatus.java
@@ -200,6 +200,15 @@ public long getAccessTime() {
public FsPermission getPermission() {
return permission;
}
+
+ /**
+ * Tell whether the underlying file or directory is encrypted or not.
+ *
+ * @return true if the underlying file is encrypted.
+ */
+ public boolean isEncrypted() {
+ return permission.getEncryptedBit();
+ }
/**
* Get the owner of the file.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
index ee84437d8e17d..264a095270632 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/permission/FsPermission.java
@@ -294,6 +294,13 @@ public boolean getAclBit() {
return false;
}
+ /**
+ * Returns true if the file is encrypted or directory is in an encryption zone
+ */
+ public boolean getEncryptedBit() {
+ return false;
+ }
+
/** Set the user file creation mask (umask) */
public static void setUMask(Configuration conf, FsPermission umask) {
conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort()));
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/VersionMismatchException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/VersionMismatchException.java
index a72be58832dd4..015c15e8a6d16 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/VersionMismatchException.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/VersionMismatchException.java
@@ -41,7 +41,7 @@ public VersionMismatchException(byte expectedVersionIn, byte foundVersionIn){
/** Returns a string representation of this object. */
@Override
public String toString(){
- return "A record version mismatch occured. Expecting v"
+ return "A record version mismatch occurred. Expecting v"
+ expectedVersion + ", found v" + foundVersion;
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2f482c290edc2..84fe5523ec872 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -687,7 +687,8 @@ public Object run() throws IOException, InterruptedException {
* a header to the server and starts
* the connection thread that waits for responses.
*/
- private synchronized void setupIOstreams() {
+ private synchronized void setupIOstreams(
+ AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -738,11 +739,18 @@ public AuthMethod run()
remoteId.saslQop =
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
- } else if (UserGroupInformation.isSecurityEnabled() &&
- !fallbackAllowed) {
- throw new IOException("Server asks us to fall back to SIMPLE " +
- "auth, but this client is configured to only allow secure " +
- "connections.");
+ if (fallbackToSimpleAuth != null) {
+ fallbackToSimpleAuth.set(false);
+ }
+ } else if (UserGroupInformation.isSecurityEnabled()) {
+ if (!fallbackAllowed) {
+ throw new IOException("Server asks us to fall back to SIMPLE " +
+ "auth, but this client is configured to only allow secure " +
+ "connections.");
+ }
+ if (fallbackToSimpleAuth != null) {
+ fallbackToSimpleAuth.set(true);
+ }
}
}
@@ -1375,6 +1383,26 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
/**
* Make a call, passing rpcRequest, to the IPC server defined by
* remoteId, returning the rpc respond.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @param fallbackToSimpleAuth - set to true or false during this method to
+ * indicate if a secure client falls back to simple auth
+ * @returns the rpc response
+ * Throws exceptions if there are network problems or if the remote code
+ * threw an exception.
+ */
+ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
+ return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
+ fallbackToSimpleAuth);
+ }
+
+ /**
+ * Make a call, passing rpcRequest, to the IPC server defined by
+ * remoteId, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
@@ -1386,8 +1414,29 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass) throws IOException {
+ return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
+ }
+
+ /**
+ * Make a call, passing rpcRequest, to the IPC server defined by
+ * remoteId, returning the rpc response.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @param serviceClass - service class for RPC
+ * @param fallbackToSimpleAuth - set to true or false during this method to
+ * indicate if a secure client falls back to simple auth
+ * @returns the rpc response
+ * Throws exceptions if there are network problems or if the remote code
+ * threw an exception.
+ */
+ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId, int serviceClass,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
- Connection connection = getConnection(remoteId, call, serviceClass);
+ Connection connection = getConnection(remoteId, call, serviceClass,
+ fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
@@ -1444,7 +1493,8 @@ Set getConnectionIds() {
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
- Call call, int serviceClass) throws IOException {
+ Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
@@ -1468,7 +1518,7 @@ private Connection getConnection(ConnectionId remoteId,
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
- connection.setupIOstreams();
+ connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0ccdb71d0ee1e..124d835ab15f8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -27,6 +27,7 @@
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -84,14 +85,23 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion,
}
@Override
- @SuppressWarnings("unchecked")
public ProtocolProxy getProxy(Class protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+ rpcTimeout, connectionRetryPolicy, null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ProtocolProxy getProxy(Class protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
- rpcTimeout, connectionRetryPolicy);
+ rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
@@ -115,13 +125,16 @@ private static class Invoker implements RpcInvocationHandler {
private final Client client;
private final long clientProtocolVersion;
private final String protocolName;
+ private AtomicBoolean fallbackToSimpleAuth;
private Invoker(Class> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
- int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
+ int rpcTimeout, RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
this(protocol, Client.ConnectionId.getConnectionId(
addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
conf, factory);
+ this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
/**
@@ -217,7 +230,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
final RpcResponseWrapper val;
try {
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
- new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
+ new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
+ fallbackToSimpleAuth);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 4ae7956c68e52..40f6515e4a04d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -524,6 +525,7 @@ public static T getProxy(Class protocol,
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout max time for each rpc; 0 means no timeout
+ * @param connectionRetryPolicy retry policy
* @return the proxy
* @throws IOException if any error occurs
*/
@@ -535,11 +537,43 @@ public static ProtocolProxy getProtocolProxy(Class protocol,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException {
+ return getProtocolProxy(protocol, clientVersion, addr, ticket,
+ conf, factory, rpcTimeout, connectionRetryPolicy, null);
+ }
+
+ /**
+ * Get a protocol proxy that contains a proxy connection to a remote server
+ * and a set of methods that are supported by the server
+ *
+ * @param protocol protocol
+ * @param clientVersion client's version
+ * @param addr server address
+ * @param ticket security ticket
+ * @param conf configuration
+ * @param factory socket factory
+ * @param rpcTimeout max time for each rpc; 0 means no timeout
+ * @param connectionRetryPolicy retry policy
+ * @param fallbackToSimpleAuth set to true or false during calls to indicate if
+ * a secure client falls back to simple auth
+ * @return the proxy
+ * @throws IOException if any error occurs
+ */
+ public static ProtocolProxy getProtocolProxy(Class protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Configuration conf,
+ SocketFactory factory,
+ int rpcTimeout,
+ RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
- return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
- addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
+ return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
+ addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
+ fallbackToSimpleAuth);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index a8280bd2edffd..047722e649eef 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -43,6 +44,14 @@ ProtocolProxy getProxy(Class protocol,
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException;
+ /** Construct a client-side proxy object. */
+ ProtocolProxy getProxy(Class protocol,
+ long clientVersion, InetSocketAddress addr,
+ UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout,
+ RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException;
+
/**
* Construct a server for a protocol implementation instance.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 4b2dfe0de1009..c2d9435908aab 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -24,6 +24,7 @@
import java.net.InetSocketAddress;
import java.io.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -212,14 +213,17 @@ private static class Invoker implements RpcInvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
+ private final AtomicBoolean fallbackToSimpleAuth;
public Invoker(Class> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
- int rpcTimeout) throws IOException {
+ int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
+ this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}
@Override
@@ -238,7 +242,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
ObjectWritable value;
try {
value = (ObjectWritable)
- client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
+ client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
+ remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
@@ -275,11 +280,25 @@ static Client getClient(Configuration conf) {
* talking to a server at the named address.
* @param */
@Override
- @SuppressWarnings("unchecked")
public ProtocolProxy getProxy(Class protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout, RetryPolicy connectionRetryPolicy)
+ throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+ rpcTimeout, connectionRetryPolicy, null);
+ }
+
+ /** Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ * @param */
+ @Override
+ @SuppressWarnings("unchecked")
+ public ProtocolProxy getProxy(Class protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory,
+ int rpcTimeout, RetryPolicy connectionRetryPolicy,
+ AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (connectionRetryPolicy != null) {
@@ -289,7 +308,7 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion,
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
- factory, rpcTimeout));
+ factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy(protocol, proxy, true);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java
index 57014d5781b30..9c9164eaf4369 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/util/MetricsDynamicMBeanBase.java
@@ -160,7 +160,7 @@ else if (attributeName.endsWith(MIN_TIME))
else if (attributeName.endsWith(MAX_TIME))
return or.getMaxTime();
else {
- MetricsUtil.LOG.error("Unexpected attrubute suffix");
+ MetricsUtil.LOG.error("Unexpected attribute suffix");
throw new AttributeNotFoundException();
}
} else {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index 722abd95c4ae2..2107e68895b52 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -83,7 +83,12 @@ enum InitMode { NORMAL, STANDBY }
private final Map allSources;
private final Map sinks;
private final Map allSinks;
+
+ // The callback list is used by register(Callback callback), while
+ // the callback map is used by register(String name, String desc, T sink)
private final List callbacks;
+ private final Map namedCallbacks;
+
private final MetricsCollectorImpl collector;
private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
@Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
@@ -119,6 +124,7 @@ public MetricsSystemImpl(String prefix) {
sourceConfigs = Maps.newHashMap();
sinkConfigs = Maps.newHashMap();
callbacks = Lists.newArrayList();
+ namedCallbacks = Maps.newHashMap();
injectedTags = Lists.newArrayList();
collector = new MetricsCollectorImpl();
if (prefix != null) {
@@ -178,11 +184,13 @@ public synchronized void start() {
return;
}
for (Callback cb : callbacks) cb.preStart();
+ for (Callback cb : namedCallbacks.values()) cb.preStart();
configure(prefix);
startTimer();
monitoring = true;
LOG.info(prefix +" metrics system started");
for (Callback cb : callbacks) cb.postStart();
+ for (Callback cb : namedCallbacks.values()) cb.postStart();
}
@Override
@@ -198,6 +206,7 @@ public synchronized void stop() {
return;
}
for (Callback cb : callbacks) cb.preStop();
+ for (Callback cb : namedCallbacks.values()) cb.preStop();
LOG.info("Stopping "+ prefix +" metrics system...");
stopTimer();
stopSources();
@@ -206,6 +215,7 @@ public synchronized void stop() {
monitoring = false;
LOG.info(prefix +" metrics system stopped.");
for (Callback cb : callbacks) cb.postStop();
+ for (Callback cb : namedCallbacks.values()) cb.postStop();
}
@Override public synchronized
@@ -224,7 +234,7 @@ T register(String name, String desc, T source) {
}
// We want to re-register the source to pick up new config when the
// metrics system restarts.
- register(new AbstractCallback() {
+ register(name, new AbstractCallback() {
@Override public void postStart() {
registerSource(finalName, finalDesc, s);
}
@@ -241,6 +251,9 @@ void unregisterSource(String name) {
if (allSources.containsKey(name)) {
allSources.remove(name);
}
+ if (namedCallbacks.containsKey(name)) {
+ namedCallbacks.remove(name);
+ }
}
synchronized
@@ -268,7 +281,7 @@ T register(final String name, final String description, final T sink) {
}
// We want to re-register the sink to pick up new config
// when the metrics system restarts.
- register(new AbstractCallback() {
+ register(name, new AbstractCallback() {
@Override public void postStart() {
register(name, description, sink);
}
@@ -289,9 +302,16 @@ synchronized void registerSink(String name, String desc, MetricsSink sink) {
@Override
public synchronized void register(final Callback callback) {
- callbacks.add((Callback) Proxy.newProxyInstance(
- callback.getClass().getClassLoader(), new Class>[] { Callback.class },
- new InvocationHandler() {
+ callbacks.add((Callback) getProxyForCallback(callback));
+ }
+
+ private synchronized void register(String name, final Callback callback) {
+ namedCallbacks.put(name, (Callback) getProxyForCallback(callback));
+ }
+
+ private Object getProxyForCallback(final Callback callback) {
+ return Proxy.newProxyInstance(callback.getClass().getClassLoader(),
+ new Class>[] { Callback.class }, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
@@ -299,11 +319,11 @@ public Object invoke(Object proxy, Method method, Object[] args)
return method.invoke(callback, args);
} catch (Exception e) {
// These are not considered fatal.
- LOG.warn("Caught exception in callback "+ method.getName(), e);
+ LOG.warn("Caught exception in callback " + method.getName(), e);
}
return null;
}
- }));
+ });
}
@Override
@@ -572,6 +592,7 @@ public synchronized boolean shutdown() {
allSources.clear();
allSinks.clear();
callbacks.clear();
+ namedCallbacks.clear();
if (mbeanName != null) {
MBeans.unregister(mbeanName);
mbeanName = null;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 6ee6db769ad1c..5f11367d4914b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -26,6 +26,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -689,6 +690,12 @@ protected Random getRandom() {
return rand;
}
+ @VisibleForTesting
+ void setRandomSeed(long seed) {
+ Random rand = getRandom();
+ rand.setSeed(seed);
+ }
+
/** randomly choose one node from scope
* if scope starts with ~, choose one from the all nodes except for the
* ones in scope; otherwise, choose one from scope
@@ -775,25 +782,35 @@ public int countNumOfAvailableNodes(String scope,
scope=scope.substring(1);
}
scope = NodeBase.normalize(scope);
- int count=0; // the number of nodes in both scope & excludedNodes
+ int excludedCountInScope = 0; // the number of nodes in both scope & excludedNodes
+ int excludedCountOffScope = 0; // the number of nodes outside scope & excludedNodes
netlock.readLock().lock();
try {
- for(Node node:excludedNodes) {
- if ((NodeBase.getPath(node)+NodeBase.PATH_SEPARATOR_STR).
- startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
- count++;
+ for (Node node : excludedNodes) {
+ node = getNode(NodeBase.getPath(node));
+ if (node == null) {
+ continue;
+ }
+ if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR)
+ .startsWith(scope + NodeBase.PATH_SEPARATOR_STR)) {
+ excludedCountInScope++;
+ } else {
+ excludedCountOffScope++;
}
}
- Node n=getNode(scope);
- int scopeNodeCount=1;
+ Node n = getNode(scope);
+ int scopeNodeCount = 0;
+ if (n != null) {
+ scopeNodeCount++;
+ }
if (n instanceof InnerNode) {
scopeNodeCount=((InnerNode)n).getNumOfLeaves();
}
if (isExcluded) {
- return clusterMap.getNumOfLeaves()-
- scopeNodeCount-excludedNodes.size()+count;
+ return clusterMap.getNumOfLeaves() - scopeNodeCount
+ - excludedCountOffScope;
} else {
- return scopeNodeCount-count;
+ return scopeNodeCount - excludedCountInScope;
}
} finally {
netlock.readLock().unlock();
@@ -870,21 +887,19 @@ protected int getWeight(Node reader, Node node) {
/**
* Sort nodes array by network distance to reader.
*
- * In a three-level topology, a node can be either local, on the same rack, or
- * on a different rack from the reader. Sorting the nodes based on network
- * distance from the reader reduces network traffic and improves performance.
+ * In a three-level topology, a node can be either local, on the same rack,
+ * or on a different rack from the reader. Sorting the nodes based on network
+ * distance from the reader reduces network traffic and improves
+ * performance.
*
* As an additional twist, we also randomize the nodes at each network
- * distance using the provided random seed. This helps with load balancing
- * when there is data skew.
- *
- * @param reader Node where data will be read
- * @param nodes Available replicas with the requested data
- * @param seed Used to seed the pseudo-random generator that randomizes the
- * set of nodes at each network distance.
+ * distance. This helps with load balancing when there is data skew.
+ *
+ * @param reader Node where data will be read
+ * @param nodes Available replicas with the requested data
+ * @param activeLen Number of active nodes at the front of the array
*/
- public void sortByDistance(Node reader, Node[] nodes, int activeLen,
- long seed, boolean randomizeBlockLocationsPerBlock) {
+ public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
/** Sort weights for the nodes array */
int[] weights = new int[activeLen];
for (int i=0; i list: tree.values()) {
if (list != null) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
index cc598c0986fb2..13160ebba06f5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
@@ -268,19 +268,17 @@ protected int getWeight(Node reader, Node node) {
/**
* Sort nodes array by their distances to reader.
*
- * This is the same as
- * {@link NetworkTopology#sortByDistance(Node, Node[], long)} except with a
- * four-level network topology which contains the additional network distance
- * of a "node group" which is between local and same rack.
- *
- * @param reader Node where data will be read
- * @param nodes Available replicas with the requested data
- * @param seed Used to seed the pseudo-random generator that randomizes the
- * set of nodes at each network distance.
+ * This is the same as {@link NetworkTopology#sortByDistance(Node, Node[],
+ * int)} except with a four-level network topology which contains the
+ * additional network distance of a "node group" which is between local and
+ * same rack.
+ *
+ * @param reader Node where data will be read
+ * @param nodes Available replicas with the requested data
+ * @param activeLen Number of active nodes at the front of the array
*/
@Override
- public void sortByDistance(Node reader, Node[] nodes, int activeLen,
- long seed, boolean randomizeBlockLocationsPerBlock) {
+ public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
// If reader is not a datanode (not in NetworkTopology tree), we need to
// replace this reader with a sibling leaf node in tree.
if (reader != null && !this.contains(reader)) {
@@ -293,8 +291,7 @@ public void sortByDistance(Node reader, Node[] nodes, int activeLen,
return;
}
}
- super.sortByDistance(reader, nodes, activeLen, seed,
- randomizeBlockLocationsPerBlock);
+ super.sortByDistance(reader, nodes, activeLen);
}
/** InnerNodeWithNodeGroup represents a switch/router of a data center, rack
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index b9e26b545c363..f5e7bc9c0dee0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -127,7 +127,7 @@ public void startThreads() throws IOException {
public synchronized void reset() {
currentId = 0;
allKeys.clear();
- delegationTokenSequenceNumber = 0;
+ setDelegationTokenSeqNum(0);
currentTokens.clear();
}
@@ -141,7 +141,7 @@ public synchronized void addKey(DelegationKey key) throws IOException {
if (key.getKeyId() > currentId) {
currentId = key.getKeyId();
}
- allKeys.put(key.getKeyId(), key);
+ storeDelegationKey(key);
}
public synchronized DelegationKey[] getAllKeys() {
@@ -163,24 +163,108 @@ protected void storeNewMasterKey(DelegationKey key) throws IOException {
return;
}
+ // for ZK based secretManager
+ protected void updateMasterKey(DelegationKey key) throws IOException{
+ return;
+ }
+
// RM
protected void removeStoredMasterKey(DelegationKey key) {
return;
}
// RM
- protected void storeNewToken(TokenIdent ident, long renewDate) {
+ protected void storeNewToken(TokenIdent ident, long renewDate) throws IOException{
return;
}
+
// RM
protected void removeStoredToken(TokenIdent ident) throws IOException {
}
// RM
- protected void updateStoredToken(TokenIdent ident, long renewDate) {
+ protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOException {
return;
}
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected int getDelegationTokenSeqNum() {
+ return delegationTokenSequenceNumber;
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected int incrementDelegationTokenSeqNum() {
+ return ++delegationTokenSequenceNumber;
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void setDelegationTokenSeqNum(int seqNum) {
+ delegationTokenSequenceNumber = seqNum;
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected DelegationKey getDelegationKey(int keyId) {
+ return allKeys.get(keyId);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void storeDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ storeNewMasterKey(key);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void updateDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ updateMasterKey(key);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+ return currentTokens.get(ident);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void storeToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ storeNewToken(ident, tokenInfo.getRenewDate());
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void updateToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ updateStoredToken(ident, tokenInfo.getRenewDate());
+ }
+
/**
* This method is intended to be used for recovering persisted delegation
* tokens
@@ -196,17 +280,18 @@ public synchronized void addPersistedDelegationToken(
"Can't add persisted delegation token to a running SecretManager.");
}
int keyId = identifier.getMasterKeyId();
- DelegationKey dKey = allKeys.get(keyId);
+ DelegationKey dKey = getDelegationKey(keyId);
if (dKey == null) {
LOG.warn("No KEY found for persisted identifier " + identifier.toString());
return;
}
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
- if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) {
- this.delegationTokenSequenceNumber = identifier.getSequenceNumber();
+ int delegationTokenSeqNum = getDelegationTokenSeqNum();
+ if (identifier.getSequenceNumber() > delegationTokenSeqNum) {
+ setDelegationTokenSeqNum(identifier.getSequenceNumber());
}
- if (currentTokens.get(identifier) == null) {
- currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
+ if (getTokenInfo(identifier) == null) {
+ storeToken(identifier, new DelegationTokenInformation(renewDate,
password, getTrackingIdIfEnabled(identifier)));
} else {
throw new IOException("Same delegation token being added twice.");
@@ -234,7 +319,7 @@ private void updateCurrentKey() throws IOException {
synchronized (this) {
currentId = newKey.getKeyId();
currentKey = newKey;
- allKeys.put(currentKey.getKeyId(), currentKey);
+ storeDelegationKey(currentKey);
}
}
@@ -252,7 +337,7 @@ void rollMasterKey() throws IOException {
* updateMasterKey() isn't called at expected interval. Add it back to
* allKeys just in case.
*/
- allKeys.put(currentKey.getKeyId(), currentKey);
+ updateDelegationKey(currentKey);
}
updateCurrentKey();
}
@@ -276,19 +361,25 @@ private synchronized void removeExpiredKeys() {
protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
- sequenceNum = ++delegationTokenSequenceNumber;
+ sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentId);
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + identifier);
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
- storeNewToken(identifier, now + tokenRenewInterval);
- currentTokens.put(identifier, new DelegationTokenInformation(now
- + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
+ DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
+ try {
+ storeToken(identifier, tokenInfo);
+ } catch (IOException ioe) {
+ LOG.error("Could not store token !!", ioe);
+ }
return password;
}
+
+
/**
* Find the DelegationTokenInformation for the given token id, and verify that
* if the token is expired. Note that this method should be called with
@@ -297,7 +388,7 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
- DelegationTokenInformation info = currentTokens.get(identifier);
+ DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
throw new InvalidToken("token (" + identifier.toString()
+ ") can't be found in cache");
@@ -322,7 +413,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
}
public synchronized String getTokenTrackingId(TokenIdent identifier) {
- DelegationTokenInformation info = currentTokens.get(identifier);
+ DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
}
@@ -373,7 +464,7 @@ public synchronized long renewToken(Token token,
throw new AccessControlException(renewer +
" tries to renew a token with renewer " + id.getRenewer());
}
- DelegationKey key = allKeys.get(id.getMasterKeyId());
+ DelegationKey key = getDelegationKey(id.getMasterKeyId());
if (key == null) {
throw new InvalidToken("Unable to find master key for keyId="
+ id.getMasterKeyId()
@@ -390,11 +481,10 @@ public synchronized long renewToken(Token token,
DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password, trackingId);
- if (currentTokens.get(id) == null) {
+ if (getTokenInfo(id) == null) {
throw new InvalidToken("Renewal request for unknown token");
}
- currentTokens.put(id, info);
- updateStoredToken(id, renewTime);
+ updateToken(id, info);
return renewTime;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
new file mode 100644
index 0000000000000..23c7144501752
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of {@link AbstractDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in Zookeeper. This class can
+ * be used by HA (Highly available) services that consists of multiple nodes.
+ * This class ensures that Identifiers and Keys are replicated to all nodes of
+ * the service.
+ */
+@InterfaceAudience.Private
+public abstract class ZKDelegationTokenSecretManager
+ extends AbstractDelegationTokenSecretManager {
+
+ private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
+ public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+ + "zkNumRetries";
+ public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
+ + "zkSessionTimeout";
+ public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
+ + "zkConnectionTimeout";
+ public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
+ + "znodeWorkingPath";
+ public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
+ + "zkAuthType";
+ public static final String ZK_DTSM_ZK_CONNECTION_STRING = ZK_CONF_PREFIX
+ + "zkConnectionString";
+ public static final String ZK_DTSM_ZK_KERBEROS_KEYTAB = ZK_CONF_PREFIX
+ + "kerberos.keytab";
+ public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
+ + "kerberos.principal";
+
+ public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
+ public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
+ public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
+ public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
+
+ private static Logger LOG = LoggerFactory
+ .getLogger(ZKDelegationTokenSecretManager.class);
+
+ private static final String JAAS_LOGIN_ENTRY_NAME =
+ "ZKDelegationTokenSecretManagerClient";
+
+ private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
+ private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
+ private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
+ private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
+
+ private static final String DELEGATION_KEY_PREFIX = "DK_";
+ private static final String DELEGATION_TOKEN_PREFIX = "DT_";
+
+ private static final ThreadLocal CURATOR_TL =
+ new ThreadLocal();
+
+ public static void setCurator(CuratorFramework curator) {
+ CURATOR_TL.set(curator);
+ }
+
+ private final boolean isExternalClient;
+ private final CuratorFramework zkClient;
+ private SharedCount seqCounter;
+ private PathChildrenCache keyCache;
+ private PathChildrenCache tokenCache;
+ private ExecutorService listenerThreadPool;
+
+ public ZKDelegationTokenSecretManager(Configuration conf) {
+ super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
+ DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.MAX_LIFETIME,
+ DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
+ DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
+ conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
+ DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+ if (CURATOR_TL.get() != null) {
+ zkClient = CURATOR_TL.get();
+ isExternalClient = true;
+ } else {
+ String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
+ Preconditions.checkNotNull(connString,
+ "Zookeeper connection string cannot be null");
+ String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
+
+ // AuthType has to be explicitly set to 'none' or 'sasl'
+ Preconditions.checkNotNull(authType, "Zookeeper authType cannot be null !!");
+ Preconditions.checkArgument(
+ authType.equals("sasl") || authType.equals("none"),
+ "Zookeeper authType must be one of [none, sasl]");
+
+ Builder builder = null;
+ try {
+ ACLProvider aclProvider = null;
+ if (authType.equals("sasl")) {
+ LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+ + "and using 'sasl' ACLs");
+ String principal = setJaasConfiguration(conf);
+ System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+ JAAS_LOGIN_ENTRY_NAME);
+ System.setProperty("zookeeper.authProvider.1",
+ "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+ aclProvider = new SASLOwnerACLProvider(principal);
+ } else { // "none"
+ LOG.info("Connecting to ZooKeeper without authentication");
+ aclProvider = new DefaultACLProvider(); // open to everyone
+ }
+ int sessionT =
+ conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT,
+ ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT);
+ int numRetries =
+ conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT);
+ builder =
+ CuratorFrameworkFactory
+ .builder()
+ .aclProvider(aclProvider)
+ .namespace(
+ conf.get(ZK_DTSM_ZNODE_WORKING_PATH,
+ ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT)
+ + "/"
+ + ZK_DTSM_NAMESPACE
+ )
+ .sessionTimeoutMs(sessionT)
+ .connectionTimeoutMs(
+ conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT,
+ ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT)
+ )
+ .retryPolicy(
+ new RetryNTimes(numRetries, sessionT / numRetries));
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not Load ZK acls or auth");
+ }
+ zkClient = builder.ensembleProvider(new FixedEnsembleProvider(connString))
+ .build();
+ isExternalClient = false;
+ }
+ listenerThreadPool = Executors.newFixedThreadPool(2);
+ }
+
+ private String setJaasConfiguration(Configuration config) throws Exception {
+ String keytabFile =
+ config.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
+ if (keytabFile == null || keytabFile.length() == 0) {
+ throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_KEYTAB
+ + " must be specified");
+ }
+ String principal =
+ config.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim();
+ if (principal == null || principal.length() == 0) {
+ throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_PRINCIPAL
+ + " must be specified");
+ }
+
+ JaasConfiguration jConf =
+ new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
+ javax.security.auth.login.Configuration.setConfiguration(jConf);
+ return principal.split("[/@]")[0];
+ }
+
+ /**
+ * Creates a programmatic version of a jaas.conf file. This can be used
+ * instead of writing a jaas.conf file and setting the system property,
+ * "java.security.auth.login.config", to point to that file. It is meant to be
+ * used for connecting to ZooKeeper.
+ */
+ @InterfaceAudience.Private
+ public static class JaasConfiguration extends
+ javax.security.auth.login.Configuration {
+
+ private static AppConfigurationEntry[] entry;
+ private String entryName;
+
+ /**
+ * Add an entry to the jaas configuration with the passed in name,
+ * principal, and keytab. The other necessary options will be set for you.
+ *
+ * @param entryName
+ * The name of the entry (e.g. "Client")
+ * @param principal
+ * The principal of the user
+ * @param keytab
+ * The location of the keytab
+ */
+ public JaasConfiguration(String entryName, String principal, String keytab) {
+ this.entryName = entryName;
+ Map options = new HashMap();
+ options.put("keyTab", keytab);
+ options.put("principal", principal);
+ options.put("useKeyTab", "true");
+ options.put("storeKey", "true");
+ options.put("useTicketCache", "false");
+ options.put("refreshKrb5Config", "true");
+ String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG");
+ if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
+ options.put("debug", "true");
+ }
+ entry = new AppConfigurationEntry[] {
+ new AppConfigurationEntry(getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ options) };
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ return (entryName.equals(name)) ? entry : null;
+ }
+
+ private String getKrb5LoginModuleName() {
+ String krb5LoginModuleName;
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
+ } else {
+ krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ }
+ return krb5LoginModuleName;
+ }
+ }
+
+ @Override
+ public void startThreads() throws IOException {
+ if (!isExternalClient) {
+ try {
+ zkClient.start();
+ } catch (Exception e) {
+ throw new IOException("Could not start Curator Framework", e);
+ }
+ }
+ try {
+ seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
+ if (seqCounter != null) {
+ seqCounter.start();
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not start Sequence Counter", e);
+ }
+ try {
+ createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
+ createPersistentNode(ZK_DTSM_TOKENS_ROOT);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create ZK paths");
+ }
+ try {
+ keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
+ if (keyCache != null) {
+ keyCache.start(StartMode.POST_INITIALIZED_EVENT);
+ keyCache.getListenable().addListener(new PathChildrenCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event)
+ throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ processKeyAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_UPDATED:
+ processKeyAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_REMOVED:
+ processKeyRemoved(event.getData().getPath());
+ break;
+ default:
+ break;
+ }
+ }
+ }, listenerThreadPool);
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not start PathChildrenCache for keys", e);
+ }
+ try {
+ tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
+ if (tokenCache != null) {
+ tokenCache.start(StartMode.POST_INITIALIZED_EVENT);
+ tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
+
+ @Override
+ public void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event) throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ processTokenAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_UPDATED:
+ processTokenAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_REMOVED:
+ processTokenRemoved(event.getData().getData());
+ break;
+ default:
+ break;
+ }
+ }
+ }, listenerThreadPool);
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not start PathChildrenCache for tokens", e);
+ }
+ super.startThreads();
+ }
+
+ private void processKeyAddOrUpdate(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ DelegationKey key = new DelegationKey();
+ key.readFields(din);
+ allKeys.put(key.getKeyId(), key);
+ }
+
+ private void processKeyRemoved(String path) {
+ int i = path.lastIndexOf('/');
+ if (i > 0) {
+ String tokSeg = path.substring(i + 1);
+ int j = tokSeg.indexOf('_');
+ if (j > 0) {
+ int keyId = Integer.parseInt(tokSeg.substring(j + 1));
+ allKeys.remove(keyId);
+ }
+ }
+ }
+
+ private void processTokenAddOrUpdate(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ TokenIdent ident = createIdentifier();
+ ident.readFields(din);
+ long renewDate = din.readLong();
+ int pwdLen = din.readInt();
+ byte[] password = new byte[pwdLen];
+ int numRead = din.read(password, 0, pwdLen);
+ if (numRead > -1) {
+ DelegationTokenInformation tokenInfo =
+ new DelegationTokenInformation(renewDate, password);
+ currentTokens.put(ident, tokenInfo);
+ }
+ }
+
+ private void processTokenRemoved(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ TokenIdent ident = createIdentifier();
+ ident.readFields(din);
+ currentTokens.remove(ident);
+ }
+
+ @Override
+ public void stopThreads() {
+ try {
+ if (!isExternalClient && (zkClient != null)) {
+ zkClient.close();
+ }
+ if (seqCounter != null) {
+ seqCounter.close();
+ }
+ if (keyCache != null) {
+ keyCache.close();
+ }
+ if (tokenCache != null) {
+ tokenCache.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Could not stop Curator Framework", e);
+ // Ignore
+ }
+ super.stopThreads();
+ }
+
+ private void createPersistentNode(String nodePath) throws Exception {
+ try {
+ zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
+ } catch (KeeperException.NodeExistsException ne) {
+ LOG.debug(nodePath + " znode already exists !!");
+ } catch (Exception e) {
+ throw new IOException(nodePath + " znode could not be created !!", e);
+ }
+ }
+
+ @Override
+ protected int getDelegationTokenSeqNum() {
+ return seqCounter.getCount();
+ }
+
+ @Override
+ protected int incrementDelegationTokenSeqNum() {
+ try {
+ while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) {
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not increment shared counter !!", e);
+ }
+ return seqCounter.getCount();
+ }
+
+ @Override
+ protected void setDelegationTokenSeqNum(int seqNum) {
+ delegationTokenSequenceNumber = seqNum;
+ }
+
+ @Override
+ protected DelegationKey getDelegationKey(int keyId) {
+ // First check if its I already have this key
+ DelegationKey key = allKeys.get(keyId);
+ // Then query ZK
+ if (key == null) {
+ try {
+ key = getKeyFromZK(keyId);
+ if (key != null) {
+ allKeys.put(keyId, key);
+ }
+ } catch (IOException e) {
+ LOG.error("Error retrieving key [" + keyId + "] from ZK", e);
+ }
+ }
+ return key;
+ }
+
+ private DelegationKey getKeyFromZK(int keyId) throws IOException {
+ String nodePath =
+ getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + keyId);
+ try {
+ byte[] data = zkClient.getData().forPath(nodePath);
+ if ((data == null) || (data.length == 0)) {
+ return null;
+ }
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ DelegationKey key = new DelegationKey();
+ key.readFields(din);
+ return key;
+ } catch (KeeperException.NoNodeException e) {
+ LOG.error("No node in path [" + nodePath + "]");
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ return null;
+ }
+
+ @Override
+ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+ // First check if I have this..
+ DelegationTokenInformation tokenInfo = currentTokens.get(ident);
+ // Then query ZK
+ if (tokenInfo == null) {
+ try {
+ tokenInfo = getTokenInfoFromZK(ident);
+ if (tokenInfo != null) {
+ currentTokens.put(ident, tokenInfo);
+ }
+ } catch (IOException e) {
+ LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber()
+ + "] from ZK", e);
+ }
+ }
+ return tokenInfo;
+ }
+
+ private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
+ throws IOException {
+ String nodePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT,
+ DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
+ try {
+ byte[] data = zkClient.getData().forPath(nodePath);
+ if ((data == null) || (data.length == 0)) {
+ return null;
+ }
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ createIdentifier().readFields(din);
+ long renewDate = din.readLong();
+ int pwdLen = din.readInt();
+ byte[] password = new byte[pwdLen];
+ int numRead = din.read(password, 0, pwdLen);
+ if (numRead > -1) {
+ DelegationTokenInformation tokenInfo =
+ new DelegationTokenInformation(renewDate, password);
+ return tokenInfo;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ LOG.error("No node in path [" + nodePath + "]");
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ return null;
+ }
+
+ @Override
+ protected void storeDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ addOrUpdateDelegationKey(key, false);
+ }
+
+ @Override
+ protected void updateDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ addOrUpdateDelegationKey(key, true);
+ }
+
+ private void addOrUpdateDelegationKey(DelegationKey key, boolean isUpdate)
+ throws IOException {
+ String nodeCreatePath =
+ getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
+ DELEGATION_KEY_PREFIX + key.getKeyId());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream fsOut = new DataOutputStream(os);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing ZKDTSMDelegationKey_" + key.getKeyId());
+ }
+ key.write(fsOut);
+ try {
+ if (zkClient.checkExists().forPath(nodeCreatePath) != null) {
+ zkClient.setData().forPath(nodeCreatePath, os.toByteArray())
+ .setVersion(-1);
+ if (!isUpdate) {
+ LOG.debug("Key with path [" + nodeCreatePath
+ + "] already exists.. Updating !!");
+ }
+ } else {
+ zkClient.create().withMode(CreateMode.PERSISTENT)
+ .forPath(nodeCreatePath, os.toByteArray());
+ if (isUpdate) {
+ LOG.debug("Updating non existent Key path [" + nodeCreatePath
+ + "].. Adding new !!");
+ }
+ }
+ } catch (KeeperException.NodeExistsException ne) {
+ LOG.debug(nodeCreatePath + " znode already exists !!");
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ } finally {
+ os.close();
+ }
+ }
+
+ @Override
+ protected void removeStoredMasterKey(DelegationKey key) {
+ String nodeRemovePath =
+ getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
+ DELEGATION_KEY_PREFIX + key.getKeyId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing ZKDTSMDelegationKey_" + key.getKeyId());
+ }
+ try {
+ if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ zkClient.delete().forPath(nodeRemovePath);
+ } else {
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
+ }
+ } catch (Exception e) {
+ LOG.debug(nodeRemovePath + " znode could not be removed!!");
+ }
+ }
+
+ @Override
+ protected void storeToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ try {
+ addOrUpdateToken(ident, tokenInfo, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void updateToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ String nodeRemovePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ + ident.getSequenceNumber());
+ try {
+ if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ addOrUpdateToken(ident, tokenInfo, false);
+ LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
+ } else {
+ addOrUpdateToken(ident, tokenInfo, true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber(), e);
+ }
+ }
+
+ @Override
+ protected void removeStoredToken(TokenIdent ident)
+ throws IOException {
+ String nodeRemovePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ + ident.getSequenceNumber());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber());
+ }
+ try {
+ if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
+ } else {
+ zkClient.delete().forPath(nodeRemovePath);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Could not remove Stored Token ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber(), e);
+ }
+ }
+
+ private void addOrUpdateToken(TokenIdent ident,
+ DelegationTokenInformation info, boolean isUpdate) throws Exception {
+ String nodeCreatePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ + ident.getSequenceNumber());
+ ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+ DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+ ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+
+ try {
+ ident.write(tokenOut);
+ tokenOut.writeLong(info.getRenewDate());
+ tokenOut.writeInt(info.getPassword().length);
+ tokenOut.write(info.getPassword());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug((isUpdate ? "Storing " : "Updating ")
+ + "ZKDTSMDelegationToken_" +
+ ident.getSequenceNumber());
+ }
+ if (isUpdate) {
+ zkClient.setData().forPath(nodeCreatePath, tokenOs.toByteArray())
+ .setVersion(-1);
+ } else {
+ zkClient.create().withMode(CreateMode.PERSISTENT)
+ .forPath(nodeCreatePath, tokenOs.toByteArray());
+ }
+ } finally {
+ seqOs.close();
+ }
+ }
+
+ /**
+ * Simple implementation of an {@link ACLProvider} that simply returns an ACL
+ * that gives all permissions only to a single principal.
+ */
+ private static class SASLOwnerACLProvider implements ACLProvider {
+
+ private final List saslACL;
+
+ private SASLOwnerACLProvider(String principal) {
+ this.saslACL = Collections.singletonList(
+ new ACL(Perms.ALL, new Id("sasl", principal)));
+ }
+
+ @Override
+ public List getDefaultAcl() {
+ return saslACL;
+ }
+
+ @Override
+ public List getAclForPath(String path) {
+ return saslACL;
+ }
+ }
+
+ @VisibleForTesting
+ @Private
+ @Unstable
+ static String getNodePath(String root, String nodeName) {
+ return (root + "/" + nodeName);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
index 64a562254b79a..aa9ec9948d454 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.security.token.delegation.web;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -28,9 +29,11 @@
import org.apache.hadoop.security.authentication.server.AuthenticationToken;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
@@ -153,7 +156,14 @@ protected Configuration getProxyuserConfiguration(FilterConfig filterConfig)
@Override
public void init(FilterConfig filterConfig) throws ServletException {
+ // A single CuratorFramework should be used for a ZK cluster.
+ // If the ZKSignerSecretProvider has already created it, it has to
+ // be set here... to be used by the ZKDelegationTokenSecretManager
+ ZKDelegationTokenSecretManager.setCurator((CuratorFramework)
+ filterConfig.getServletContext().getAttribute(ZKSignerSecretProvider.
+ ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE));
super.init(filterConfig);
+ ZKDelegationTokenSecretManager.setCurator(null);
AuthenticationHandler handler = getAuthenticationHandler();
AbstractDelegationTokenSecretManager dtSecretManager =
(AbstractDelegationTokenSecretManager) filterConfig.getServletContext().
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
index f41f892caa059..5a31d6dc29495 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
@@ -78,19 +78,6 @@ public abstract class DelegationTokenAuthenticationHandler
public static final String TOKEN_KIND = PREFIX + "token-kind";
- public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
- public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
-
- public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
- public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
-
- public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
- public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
-
- public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
- "removal-scan-interval.sec";
- public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
-
private static final Set DELEGATION_TOKEN_OPS = new HashSet();
static final String DELEGATION_TOKEN_UGI_ATTRIBUTE =
@@ -142,7 +129,6 @@ public void setExternalDelegationTokenSecretManager(
@VisibleForTesting
@SuppressWarnings("unchecked")
public void initTokenManager(Properties config) {
- String configPrefix = authHandler.getType() + ".";
Configuration conf = new Configuration(false);
for (Map.Entry entry : config.entrySet()) {
conf.set((String) entry.getKey(), (String) entry.getValue());
@@ -153,17 +139,7 @@ public void initTokenManager(Properties config) {
"The configuration does not define the token kind");
}
tokenKind = tokenKind.trim();
- long updateInterval = conf.getLong(configPrefix + UPDATE_INTERVAL,
- UPDATE_INTERVAL_DEFAULT);
- long maxLifeTime = conf.getLong(configPrefix + MAX_LIFETIME,
- MAX_LIFETIME_DEFAULT);
- long renewInterval = conf.getLong(configPrefix + RENEW_INTERVAL,
- RENEW_INTERVAL_DEFAULT);
- long removalScanInterval = conf.getLong(
- configPrefix + REMOVAL_SCAN_INTERVAL, REMOVAL_SCAN_INTERVAL_DEFAULT);
- tokenManager = new DelegationTokenManager(new Text(tokenKind),
- updateInterval * 1000, maxLifeTime * 1000, renewInterval * 1000,
- removalScanInterval * 1000);
+ tokenManager = new DelegationTokenManager(conf, new Text(tokenKind));
tokenManager.init();
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
index 2e6b46e413660..dbde0a29f087e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
@@ -17,16 +17,20 @@
*/
package org.apache.hadoop.security.token.delegation.web;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
/**
* Delegation Token Manager used by the
@@ -35,20 +39,36 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-class DelegationTokenManager {
+public class DelegationTokenManager {
+
+ public static final String ENABLE_ZK_KEY = "zk-dt-secret-manager.enable";
+
+ public static final String PREFIX = "delegation-token.";
+
+ public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
+ public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
+
+ public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
+ public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
+
+ public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
+ public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
+
+ public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
+ "removal-scan-interval.sec";
+ public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
private static class DelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager {
private Text tokenKind;
- public DelegationTokenSecretManager(Text tokenKind,
- long delegationKeyUpdateInterval,
- long delegationTokenMaxLifetime,
- long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
- super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
- delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ public DelegationTokenSecretManager(Configuration conf, Text tokenKind) {
+ super(conf.getLong(UPDATE_INTERVAL, UPDATE_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(MAX_LIFETIME, MAX_LIFETIME_DEFAULT) * 1000,
+ conf.getLong(RENEW_INTERVAL, RENEW_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(REMOVAL_SCAN_INTERVAL,
+ REMOVAL_SCAN_INTERVAL_DEFAULT * 1000));
this.tokenKind = tokenKind;
}
@@ -56,21 +76,34 @@ public DelegationTokenSecretManager(Text tokenKind,
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier(tokenKind);
}
+ }
+
+ private static class ZKSecretManager
+ extends ZKDelegationTokenSecretManager {
+
+ private Text tokenKind;
+
+ public ZKSecretManager(Configuration conf, Text tokenKind) {
+ super(conf);
+ this.tokenKind = tokenKind;
+ }
+ @Override
+ public DelegationTokenIdentifier createIdentifier() {
+ return new DelegationTokenIdentifier(tokenKind);
+ }
}
private AbstractDelegationTokenSecretManager secretManager = null;
private boolean managedSecretManager;
private Text tokenKind;
- public DelegationTokenManager(Text tokenKind,
- long delegationKeyUpdateInterval,
- long delegationTokenMaxLifetime,
- long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
- this.secretManager = new DelegationTokenSecretManager(tokenKind,
- delegationKeyUpdateInterval, delegationTokenMaxLifetime,
- delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ public DelegationTokenManager(Configuration conf, Text tokenKind) {
+ if (conf.getBoolean(ENABLE_ZK_KEY, false)) {
+ this.secretManager = new ZKSecretManager(conf, tokenKind);
+ } else {
+ this.secretManager = new DelegationTokenSecretManager(conf, tokenKind);
+ }
this.tokenKind = tokenKind;
managedSecretManager = true;
}
@@ -150,4 +183,9 @@ public UserGroupInformation verifyToken(Token
return id.getUser();
}
+ @VisibleForTesting
+ @SuppressWarnings("rawtypes")
+ public AbstractDelegationTokenSecretManager getDelegationTokenSecretManager() {
+ return secretManager;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java
index f2ee446b4ab94..6b27ae5397da7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DiskChecker.java
@@ -102,7 +102,7 @@ public static void checkDirs(File dir) throws DiskErrorException {
*/
public static void checkDir(File dir) throws DiskErrorException {
if (!mkdirsWithExistsCheck(dir)) {
- throw new DiskErrorException("Can not create directory: "
+ throw new DiskErrorException("Cannot create directory: "
+ dir.toString());
}
checkDirAccess(dir);
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index 70796ccde126e..e59fa1b16389b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -64,6 +64,33 @@ all operations on a valid FileSystem MUST result in a new FileSystem that is als
def isSymlink(FS, p) = p in symlinks(FS)
+### 'boolean inEncryptionZone(Path p)'
+
+Return True if the data for p is encrypted. The nature of the encryption and the
+mechanism for creating an encryption zone are implementation details not covered
+in this specification. No guarantees are made about the quality of the
+encryption. The metadata is not encrypted.
+
+#### Preconditions
+
+ if not exists(FS, p) : raise FileNotFoundException
+
+#### Postconditions
+
+#### Invariants
+
+All files and directories under a directory in an encryption zone are also in an
+encryption zone
+
+ forall d in directories(FS): inEncyptionZone(FS, d) implies
+ forall c in children(FS, d) where (isFile(FS, c) or isDir(FS, c)) :
+ inEncyptionZone(FS, c)
+
+For all files in an encrypted zone, the data is encrypted, but the encryption
+type and specification are not defined.
+
+ forall f in files(FS) where inEncyptionZone(FS, c):
+ isEncrypted(data(f))
### `FileStatus getFileStatus(Path p)`
@@ -88,6 +115,10 @@ Get the status of a path
stat.length = 0
stat.isdir = False
stat.symlink = FS.Symlinks[p]
+ if inEncryptionZone(FS, p) :
+ stat.isEncrypted = True
+ else
+ stat.isEncrypted = False
### `Path getHomeDirectory()`
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
index f5acc73b147cd..86bb64d882c06 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
@@ -469,6 +469,7 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
int bufPos) throws Exception {
buf.position(bufPos);
int n = ((ByteBufferReadable) in).read(buf);
+ Assert.assertEquals(bufPos + n, buf.position());
byte[] readData = new byte[n];
buf.rewind();
buf.position(bufPos);
@@ -568,6 +569,7 @@ public void testCombinedOp() throws Exception {
// Read forward len1
ByteBuffer buf = ByteBuffer.allocate(len1);
int nRead = ((ByteBufferReadable) in).read(buf);
+ Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead];
buf.rewind();
buf.get(readData);
@@ -575,9 +577,10 @@ public void testCombinedOp() throws Exception {
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
- // Pos should be len1 + 2 * len2 + nRead
+ long lastPos = pos;
+ // Pos should be lastPos + nRead
pos = ((Seekable) in).getPos();
- Assert.assertEquals(len1 + 2 * len2 + nRead, pos);
+ Assert.assertEquals(lastPos + nRead, pos);
// Pos: 1/3 dataLen
positionedReadCheck(in , dataLen / 3);
@@ -589,13 +592,15 @@ public void testCombinedOp() throws Exception {
System.arraycopy(data, (int)pos, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData);
- // Pos should be 2 * len1 + 2 * len2 + nRead
+ lastPos = pos;
+ // Pos should be lastPos + len1
pos = ((Seekable) in).getPos();
- Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos);
+ Assert.assertEquals(lastPos + len1, pos);
// Read forward len1
buf = ByteBuffer.allocate(len1);
nRead = ((ByteBufferReadable) in).read(buf);
+ Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead];
buf.rewind();
buf.get(readData);
@@ -603,6 +608,11 @@ public void testCombinedOp() throws Exception {
System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData);
+ lastPos = pos;
+ // Pos should be lastPos + nRead
+ pos = ((Seekable) in).getPos();
+ Assert.assertEquals(lastPos + nRead, pos);
+
// ByteBuffer read after EOF
((Seekable) in).seek(dataLen);
buf.clear();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
index 65ebfb194666a..cbbb27e91eb96 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
@@ -30,6 +31,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
/**
@@ -65,6 +67,16 @@ public void testOpenReadZeroByteFile() throws Throwable {
assertMinusOne("initial byte read", result);
}
+ @Test
+ public void testFsIsEncrypted() throws Exception {
+ describe("create an empty file and call FileStatus.isEncrypted()");
+ final Path path = path("file");
+ createFile(getFileSystem(), path, false, new byte[0]);
+ final FileStatus stat = getFileSystem().getFileStatus(path);
+ assertFalse("Expecting false for stat.isEncrypted()",
+ stat.isEncrypted());
+ }
+
@Test
public void testOpenReadDir() throws Throwable {
describe("create & read a directory");
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index f0e389ff5de94..c1b1bfb902b15 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -276,12 +276,22 @@ private static interface StoppedProtocol {
*/
private static class StoppedRpcEngine implements RpcEngine {
- @SuppressWarnings("unchecked")
@Override
public ProtocolProxy getProxy(Class protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
) throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+ rpcTimeout, connectionRetryPolicy, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ProtocolProxy getProxy(Class protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout,
+ RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth
+ ) throws IOException {
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new StoppedInvocationHandler());
return new ProtocolProxy(protocol, proxy, false);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
new file mode 100644
index 0000000000000..3ab663f3dfa22
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestClusterTopology.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestClusterTopology extends Assert {
+
+ public static class NodeElement implements Node {
+ private String location;
+ private String name;
+ private Node parent;
+ private int level;
+
+ public NodeElement(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getNetworkLocation() {
+ return location;
+ }
+
+ @Override
+ public void setNetworkLocation(String location) {
+ this.location = location;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Node getParent() {
+ return parent;
+ }
+
+ @Override
+ public void setParent(Node parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public int getLevel() {
+ return level;
+ }
+
+ @Override
+ public void setLevel(int i) {
+ this.level = i;
+ }
+
+ }
+
+ /**
+ * Test the count of nodes with exclude list
+ */
+ @Test
+ public void testCountNumNodes() throws Exception {
+ // create the topology
+ NetworkTopology cluster = new NetworkTopology();
+ cluster.add(getNewNode("node1", "/d1/r1"));
+ NodeElement node2 = getNewNode("node2", "/d1/r2");
+ cluster.add(node2);
+ cluster.add(getNewNode("node3", "/d1/r3"));
+ NodeElement node3 = getNewNode("node4", "/d1/r4");
+ cluster.add(node3);
+ // create exclude list
+ List excludedNodes = new ArrayList();
+
+ assertEquals("4 nodes should be available", 4,
+ cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
+ NodeElement deadNode = getNewNode("node5", "/d1/r2");
+ excludedNodes.add(deadNode);
+ assertEquals("4 nodes should be available with extra excluded Node", 4,
+ cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
+ // add one existing node to exclude list
+ excludedNodes.add(node3);
+ assertEquals("excluded nodes with ROOT scope should be considered", 3,
+ cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes));
+ assertEquals("excluded nodes without ~ scope should be considered", 2,
+ cluster.countNumOfAvailableNodes("~" + deadNode.getNetworkLocation(),
+ excludedNodes));
+ assertEquals("excluded nodes with rack scope should be considered", 1,
+ cluster.countNumOfAvailableNodes(deadNode.getNetworkLocation(),
+ excludedNodes));
+ // adding the node in excluded scope to excluded list
+ excludedNodes.add(node2);
+ assertEquals("excluded nodes with ~ scope should be considered", 2,
+ cluster.countNumOfAvailableNodes("~" + deadNode.getNetworkLocation(),
+ excludedNodes));
+ // getting count with non-exist scope.
+ assertEquals("No nodes should be considered for non-exist scope", 0,
+ cluster.countNumOfAvailableNodes("/non-exist", excludedNodes));
+ }
+
+ private NodeElement getNewNode(String name, String rackLocation) {
+ NodeElement node = new NodeElement(name);
+ node.setNetworkLocation(rackLocation);
+ return node;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
index 657fae3f52646..15bd9fe492404 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java
@@ -104,8 +104,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[2];
testNodes[2] = dataNodes[3];
testNodes[3] = dataNodes[0];
- cluster.sortByDistance(dataNodes[0], testNodes,
- testNodes.length, 0xDEADBEEF, false);
+ cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[2]);
@@ -116,8 +115,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[4];
testNodes[2] = dataNodes[1];
testNodes[3] = dataNodes[0];
- cluster.sortByDistance(dataNodes[0], testNodes,
- testNodes.length, 0xDEADBEEF, false);
+ cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
@@ -126,8 +124,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
- cluster.sortByDistance(dataNodes[0], testNodes,
- testNodes.length, 0xDEADBEEF, false);
+ cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
@@ -136,8 +133,7 @@ public void testSortByDistance() throws Exception {
testNodes[1] = dataNodes[7];
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
- cluster.sortByDistance(computeNode, testNodes,
- testNodes.length, 0xDEADBEEF, false);
+ cluster.sortByDistance(computeNode, testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
index 4f83a57f3f69a..239b8414eb39c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
@@ -121,7 +121,7 @@ protected void removeStoredMasterKey(DelegationKey key) {
@Override
protected void storeNewToken(TestDelegationTokenIdentifier ident,
- long renewDate) {
+ long renewDate) throws IOException {
super.storeNewToken(ident, renewDate);
isStoreNewTokenCalled = true;
}
@@ -135,7 +135,7 @@ protected void removeStoredToken(TestDelegationTokenIdentifier ident)
@Override
protected void updateStoredToken(TestDelegationTokenIdentifier ident,
- long renewDate) {
+ long renewDate) throws IOException {
super.updateStoredToken(ident, renewDate);
isUpdateStoredTokenCalled = true;
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
new file mode 100644
index 0000000000000..076c87ae689d2
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZKDelegationTokenSecretManager {
+
+ private static final long DAY_IN_SECS = 86400;
+
+ @Test
+ public void testZKDelTokSecretManager() throws Exception {
+ TestingServer zkServer = new TestingServer();
+ DelegationTokenManager tm1, tm2 = null;
+ zkServer.start();
+ try {
+ String connectString = zkServer.getConnectString();
+ Configuration conf = new Configuration();
+ conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
+ conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
+ conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
+ conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
+ conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+ tm1 = new DelegationTokenManager(conf, new Text("foo"));
+ tm1.init();
+ tm2 = new DelegationTokenManager(conf, new Text("foo"));
+ tm2.init();
+
+ Token token =
+ tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm2.verifyToken(token);
+
+ token = tm2.createToken(UserGroupInformation.getCurrentUser(), "bar");
+ Assert.assertNotNull(token);
+ tm1.verifyToken(token);
+ } finally {
+ zkServer.close();
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
index 4a0e8342f21b8..496b762bc090d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
@@ -17,27 +17,28 @@
*/
package org.apache.hadoop.security.token.delegation.web;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-
public class TestDelegationTokenManager {
private static final long DAY_IN_SECS = 86400;
@Test
public void testDTManager() throws Exception {
- DelegationTokenManager tm = new DelegationTokenManager(new Text("foo"),
- DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS);
+ Configuration conf = new Configuration(false);
+ conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+ DelegationTokenManager tm =
+ new DelegationTokenManager(conf, new Text("foo"));
tm.init();
Token token =
tm.createToken(UserGroupInformation.getCurrentUser(), "foo");
diff --git a/hadoop-common-project/hadoop-kms/pom.xml b/hadoop-common-project/hadoop-kms/pom.xml
index 2c225cb18eb67..e6b21aad6ce45 100644
--- a/hadoop-common-project/hadoop-kms/pom.xml
+++ b/hadoop-common-project/hadoop-kms/pom.xml
@@ -187,6 +187,11 @@
metrics-core
compile
+
+ org.apache.curator
+ curator-test
+ test
+
diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml b/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
index 24a46b86ec49a..1d5b649c83dd4 100644
--- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
+++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-acls.xml
@@ -23,7 +23,7 @@
*
ACL for create-key operations.
- If the user does is not in the GET ACL, the key material is not returned
+ If the user is not in the GET ACL, the key material is not returned
as part of the response.
@@ -58,7 +58,7 @@
hadoop.kms.acl.GET_KEYS
*
- ACL for get-keys operation.
+ ACL for get-keys operations.
@@ -66,7 +66,7 @@
hadoop.kms.acl.GET_METADATA
*
- ACL for get-key-metadata an get-keys-metadata operations.
+ ACL for get-key-metadata and get-keys-metadata operations.
@@ -74,7 +74,7 @@
hadoop.kms.acl.SET_KEY_MATERIAL
*
- Complimentary ACL for CREATE and ROLLOVER operation to allow the client
+ Complementary ACL for CREATE and ROLLOVER operations to allow the client
to provide the key material when creating or rolling a key.
@@ -83,7 +83,7 @@
hadoop.kms.acl.GENERATE_EEK
*
- ACL for generateEncryptedKey CryptoExtension operations
+ ACL for generateEncryptedKey CryptoExtension operations.
@@ -91,7 +91,7 @@
hadoop.kms.acl.DECRYPT_EEK
*
- ACL for decrypt EncryptedKey CryptoExtension operations
+ ACL for decryptEncryptedKey CryptoExtension operations.
diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml b/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml
index 20896fc2873c1..a810ca44d2958 100644
--- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml
+++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-site.xml
@@ -15,10 +15,12 @@
+
- hadoop.security.key.provider.path
+ hadoop.kms.key.provider.uri
jceks://file@/${user.home}/kms.keystore
+ URI of the backing KeyProvider for the KMS.
@@ -26,14 +28,52 @@
hadoop.security.keystore.JavaKeyStoreProvider.password
none
+ If using the JavaKeyStoreProvider, the password for the keystore file.
+
+
+ hadoop.kms.cache.enable
+ true
+
+ Whether the KMS will act as a cache for the backing KeyProvider.
+ When the cache is enabled, operations like getKeyVersion, getMetadata,
+ and getCurrentKey will sometimes return cached data without consulting
+ the backing KeyProvider. Cached values are flushed when keys are deleted
+ or modified.
+
+
+
hadoop.kms.cache.timeout.ms
+ 600000
+
+ Expiry time for the KMS key version and key metadata cache, in
+ milliseconds. This affects getKeyVersion and getMetadata.
+
+
+
+
+ hadoop.kms.current.key.cache.timeout.ms
+ 30000
+
+ Expiry time for the KMS current key cache, in milliseconds. This
+ affects getCurrentKey operations.
+
+
+
+
+
+
+ hadoop.kms.audit.aggregation.window.ms
10000
+ Duplicate audit log events within the aggregation window (specified in
+ ms) are quashed to reduce log traffic. A single message for aggregated
+ events is printed at the end of the window, along with a count of the
+ number of aggregated events.
@@ -43,7 +83,8 @@
hadoop.kms.authentication.type
simple
- simple or kerberos
+ Authentication type for the KMS. Can be either "simple"
+ or "kerberos".
@@ -51,6 +92,7 @@
hadoop.kms.authentication.kerberos.keytab
${user.home}/kms.keytab
+ Path to the keytab with credentials for the configured Kerberos principal.
@@ -58,6 +100,8 @@
hadoop.kms.authentication.kerberos.principal
HTTP/localhost
+ The Kerberos principal to use for the HTTP endpoint.
+ The principal must start with 'HTTP/' as per the Kerberos HTTP SPNEGO specification.
@@ -65,6 +109,64 @@
hadoop.kms.authentication.kerberos.name.rules
DEFAULT
+ Rules used to resolve Kerberos principal names.
+
+
+
+
+
+
+ hadoop.kms.authentication.signer.secret.provider
+ random
+
+ Indicates how the secret to sign the authentication cookies will be
+ stored. Options are 'random' (default), 'string' and 'zookeeper'.
+ If using a setup with multiple KMS instances, 'zookeeper' should be used.
+
+
+
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.path
+ /hadoop-kms/hadoop-auth-signature-secret
+
+ The Zookeeper ZNode path where the KMS instances will store and retrieve
+ the secret from.
+
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string
+ #HOSTNAME#:#PORT#,...
+
+ The Zookeeper connection string, a list of hostnames and port comma
+ separated.
+
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type
+ kerberos
+
+ The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
+
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab
+ /etc/hadoop/conf/kms.keytab
+
+ The absolute path for the Kerberos keytab with the credentials to
+ connect to Zookeeper.
+
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal
+ kms/#HOSTNAME#
+
+ The Kerberos service principal used to connect to Zookeeper.
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAudit.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAudit.java
index dc55a8459cf40..7ff76e54ffb8c 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAudit.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAudit.java
@@ -103,9 +103,17 @@ public static enum OpStatus {
private static Logger AUDIT_LOG = LoggerFactory.getLogger(KMS_LOGGER_NAME);
- KMSAudit(long delay) {
+ /**
+ * Create a new KMSAudit.
+ *
+ * @param windowMs Duplicate events within the aggregation window are quashed
+ * to reduce log traffic. A single message for aggregated
+ * events is printed at the end of the window, along with a
+ * count of the number of aggregated events.
+ */
+ KMSAudit(long windowMs) {
cache = CacheBuilder.newBuilder()
- .expireAfterWrite(delay, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(windowMs, TimeUnit.MILLISECONDS)
.removalListener(
new RemovalListener() {
@Override
@@ -126,7 +134,7 @@ public void onRemoval(
public void run() {
cache.cleanUp();
}
- }, delay / 10, delay / 10, TimeUnit.MILLISECONDS);
+ }, windowMs / 10, windowMs / 10, TimeUnit.MILLISECONDS);
}
private void logEvent(AuditEvent event) {
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java
index 4df6db5408413..79652f35ad2d3 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSAuthenticationFilter.java
@@ -46,7 +46,8 @@
@InterfaceAudience.Private
public class KMSAuthenticationFilter
extends DelegationTokenAuthenticationFilter {
- private static final String CONF_PREFIX = KMSConfiguration.CONFIG_PREFIX +
+
+ public static final String CONFIG_PREFIX = KMSConfiguration.CONFIG_PREFIX +
"authentication.";
@Override
@@ -56,9 +57,9 @@ protected Properties getConfiguration(String configPrefix,
Configuration conf = KMSWebApp.getConfiguration();
for (Map.Entry entry : conf) {
String name = entry.getKey();
- if (name.startsWith(CONF_PREFIX)) {
+ if (name.startsWith(CONFIG_PREFIX)) {
String value = conf.get(name);
- name = name.substring(CONF_PREFIX.length());
+ name = name.substring(CONFIG_PREFIX.length());
props.setProperty(name, value);
}
}
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
index f02811993f729..bd61ca7edf5cc 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java
@@ -40,6 +40,10 @@ public class KMSConfiguration {
public static final String KEY_ACL_PREFIX = "key.acl.";
public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl.";
+ // Property to set the backing KeyProvider
+ public static final String KEY_PROVIDER_URI = CONFIG_PREFIX +
+ "key.provider.uri";
+
// Property to Enable/Disable Caching
public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
"cache.enable";
@@ -50,8 +54,8 @@ public class KMSConfiguration {
public static final String CURR_KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX +
"current.key.cache.timeout.ms";
// Delay for Audit logs that need aggregation
- public static final String KMS_AUDIT_AGGREGATION_DELAY = CONFIG_PREFIX +
- "aggregation.delay.ms";
+ public static final String KMS_AUDIT_AGGREGATION_WINDOW = CONFIG_PREFIX +
+ "audit.aggregation.window.ms";
public static final boolean KEY_CACHE_ENABLE_DEFAULT = true;
// 10 mins
@@ -59,7 +63,7 @@ public class KMSConfiguration {
// 30 secs
public static final long CURR_KEY_CACHE_TIMEOUT_DEFAULT = 30 * 1000;
// 10 secs
- public static final long KMS_AUDIT_AGGREGATION_DELAY_DEFAULT = 10000;
+ public static final long KMS_AUDIT_AGGREGATION_WINDOW_DEFAULT = 10000;
// Property to Enable/Disable per Key authorization
public static final String KEY_AUTHORIZATION_ENABLE = CONFIG_PREFIX +
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSExceptionsProvider.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSExceptionsProvider.java
index 77b78ee783c1c..5cb088567c920 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSExceptionsProvider.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSExceptionsProvider.java
@@ -79,7 +79,7 @@ public Response toResponse(Exception exception) {
// we don't audit here because we did it already when checking access
doAudit = false;
} else if (throwable instanceof AuthorizationException) {
- status = Response.Status.UNAUTHORIZED;
+ status = Response.Status.FORBIDDEN;
// we don't audit here because we did it already when checking access
doAudit = false;
} else if (throwable instanceof AccessControlException) {
diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
index 0827b78286e4b..325f8db27a000 100644
--- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
+++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java
@@ -39,6 +39,7 @@
import javax.servlet.ServletContextListener;
import java.io.File;
+import java.net.URI;
import java.net.URL;
import java.util.List;
@@ -147,8 +148,8 @@ public void contextInitialized(ServletContextEvent sce) {
kmsAudit =
new KMSAudit(kmsConf.getLong(
- KMSConfiguration.KMS_AUDIT_AGGREGATION_DELAY,
- KMSConfiguration.KMS_AUDIT_AGGREGATION_DELAY_DEFAULT));
+ KMSConfiguration.KMS_AUDIT_AGGREGATION_WINDOW,
+ KMSConfiguration.KMS_AUDIT_AGGREGATION_WINDOW_DEFAULT));
// this is required for the the JMXJsonServlet to work properly.
// the JMXJsonServlet is behind the authentication filter,
@@ -159,17 +160,12 @@ public void contextInitialized(ServletContextEvent sce) {
new AccessControlList(AccessControlList.WILDCARD_ACL_VALUE));
// intializing the KeyProvider
-
- List providers = KeyProviderFactory.getProviders(kmsConf);
- if (providers.isEmpty()) {
+ String providerString = kmsConf.get(KMSConfiguration.KEY_PROVIDER_URI);
+ if (providerString == null) {
throw new IllegalStateException("No KeyProvider has been defined");
}
- if (providers.size() > 1) {
- LOG.warn("There is more than one KeyProvider configured '{}', using " +
- "the first provider",
- kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
- }
- KeyProvider keyProvider = providers.get(0);
+ KeyProvider keyProvider =
+ KeyProviderFactory.get(new URI(providerString), kmsConf);
if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
long keyTimeOutMillis =
diff --git a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
index d70f2a6d62ed3..e32893b377f1c 100644
--- a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
+++ b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm
@@ -51,7 +51,7 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version}
+---+
- hadoop.security.key.provider.path
+ hadoop.kms.key.provider.uri
jceks://file@/${user.home}/kms.keystore
@@ -448,16 +448,16 @@ $ keytool -genkey -alias tomcat -keyalg RSA
KMS supports access control for all non-read operations at the Key level.
All Key Access operations are classified as :
- * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
+ * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
- * GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
+ * GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
- * DECRYPT_EEK - decryptEncryptedKey;
+ * DECRYPT_EEK - decryptEncryptedKey
- * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
- getCurrentKey;
+ * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
+ getCurrentKey
- * ALL - all of the above;
+ * ALL - all of the above
These can be defined in the KMS <<>> as follows
@@ -554,40 +554,147 @@ $ keytool -genkey -alias tomcat -keyalg RSA
KMS delegation token secret manager can be configured with the following
properties:
- +---+
-
- hadoop.kms.authentication.delegation-token.update-interval.sec
- 86400
-
- How often the master key is rotated, in seconds. Default value 1 day.
-
-
-
-
- hadoop.kms.authentication.delegation-token.max-lifetime.sec
- 604800
-
- Maximum lifetime of a delagation token, in seconds. Default value 7 days.
-
-
-
-
- hadoop.kms.authentication.delegation-token.renew-interval.sec
- 86400
-
- Renewal interval of a delagation token, in seconds. Default value 1 day.
-
-
-
-
- hadoop.kms.authentication.delegation-token.removal-scan-interval.sec
- 3600
-
- Scan interval to remove expired delegation tokens.
-
-
- +---+
++---+
+
+ hadoop.kms.authentication.delegation-token.update-interval.sec
+ 86400
+
+ How often the master key is rotated, in seconds. Default value 1 day.
+
+
+
+
+ hadoop.kms.authentication.delegation-token.max-lifetime.sec
+ 604800
+
+ Maximum lifetime of a delagation token, in seconds. Default value 7 days.
+
+
+
+
+ hadoop.kms.authentication.delegation-token.renew-interval.sec
+ 86400
+
+ Renewal interval of a delagation token, in seconds. Default value 1 day.
+
+
+
+
+ hadoop.kms.authentication.delegation-token.removal-scan-interval.sec
+ 3600
+
+ Scan interval to remove expired delegation tokens.
+
+
++---+
+
+
+** Using Multiple Instances of KMS Behind a Load-Balancer or VIP
+
+ KMS supports multiple KMS instances behind a load-balancer or VIP for
+ scalability and for HA purposes.
+
+ When using multiple KMS instances behind a load-balancer or VIP, requests from
+ the same user may be handled by different KMS instances.
+
+ KMS instances behind a load-balancer or VIP must be specially configured to
+ work properly as a single logical service.
+
+*** HTTP Kerberos Principals Configuration
+
+ When KMS instances are behind a load-balancer or VIP, clients will use the
+ hostname of the VIP. For Kerberos SPNEGO authentication, the hostname of the
+ URL is used to construct the Kerberos service name of the server,
+ <<>>. This means that all KMS instances must have a Kerberos
+ service name with the load-balancer or VIP hostname.
+
+ In order to be able to access directly a specific KMS instance, the KMS
+ instance must also have Keberos service name with its own hostname. This is
+ required for monitoring and admin purposes.
+
+ Both Kerberos service principal credentials (for the load-balancer/VIP
+ hostname and for the actual KMS instance hostname) must be in the keytab file
+ configured for authentication. And the principal name specified in the
+ configuration must be '*'. For example:
+
++---+
+
+ hadoop.kms.authentication.kerberos.principal
+ *
+
++---+
+
+ <> If using HTTPS, the SSL certificate used by the KMS instance must
+ be configured to support multiple hostnames (see Java 7
+ <<>> SAN extension support for details on how to do this).
+
+*** HTTP Authentication Signature
+
+ KMS uses Hadoop Authentication for HTTP authentication. Hadoop Authentication
+ issues a signed HTTP Cookie once the client has authenticated successfully.
+ This HTTP Cookie has an expiration time, after which it will trigger a new
+ authentication sequence. This is done to avoid triggering the authentication
+ on every HTTP request of a client.
+
+ A KMS instance must verify the HTTP Cookie signatures signed by other KMS
+ instances. To do this all KMS instances must share the signing secret.
+
+ This secret sharing can be done using a Zookeeper service which is configured
+ in KMS with the following properties in the <<>>:
+
++---+
+
+ hadoop.kms.authentication.signer.secret.provider
+ zookeeper
+
+ Indicates how the secret to sign the authentication cookies will be
+ stored. Options are 'random' (default), 'string' and 'zookeeper'.
+ If using a setup with multiple KMS instances, 'zookeeper' should be used.
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.path
+ /hadoop-kms/hadoop-auth-signature-secret
+
+ The Zookeeper ZNode path where the KMS instances will store and retrieve
+ the secret from.
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string
+ #HOSTNAME#:#PORT#,...
+
+ The Zookeeper connection string, a list of hostnames and port comma
+ separated.
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type
+ kerberos
+
+ The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab
+ /etc/hadoop/conf/kms.keytab
+
+ The absolute path for the Kerberos keytab with the credentials to
+ connect to Zookeeper.
+
+
+
+ hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal
+ kms/#HOSTNAME#
+
+ The Kerberos service principal used to connect to Zookeeper.
+
+
++---+
+
+*** Delegation Tokens
+ TBD
** KMS HTTP REST API
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java
index f64dcf0e1aa8e..697d7ec6d62f3 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/MiniKMS.java
@@ -166,10 +166,9 @@ public void start() throws Exception {
File kmsFile = new File(kmsConfDir, "kms-site.xml");
if (!kmsFile.exists()) {
Configuration kms = new Configuration(false);
- kms.set("hadoop.security.key.provider.path",
+ kms.set(KMSConfiguration.KEY_PROVIDER_URI,
"jceks://file@" + new Path(kmsConfDir, "kms.keystore").toUri());
kms.set("hadoop.kms.authentication.type", "simple");
- kms.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
Writer writer = new FileWriter(kmsFile);
kms.writeXml(writer);
writer.close();
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
index f4f9fead63e58..921141766a3c7 100644
--- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java
@@ -117,13 +117,14 @@ protected void runServer(String keystore, String password, File confDir,
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
Configuration conf = new Configuration(false);
- conf.set("hadoop.security.key.provider.path",
+ conf.set(KMSConfiguration.KEY_PROVIDER_URI,
"jceks://file@" + new Path(keyStoreDir.getAbsolutePath(), "kms.keystore").toUri());
conf.set("hadoop.kms.authentication.type", "simple");
return conf;
}
- protected void writeConf(File confDir, Configuration conf) throws Exception {
+ public static void writeConf(File confDir, Configuration conf)
+ throws Exception {
Writer writer = new FileWriter(new File(confDir,
KMSConfiguration.KMS_SITE_XML));
conf.writeXml(writer);
@@ -139,7 +140,7 @@ protected void writeConf(File confDir, Configuration conf) throws Exception {
writer.close();
}
- protected URI createKMSUri(URL kmsUrl) throws Exception {
+ public static URI createKMSUri(URL kmsUrl) throws Exception {
String str = kmsUrl.toString();
str = str.replaceFirst("://", "@");
return new URI("kms://" + str);
diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSWithZK.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSWithZK.java
new file mode 100644
index 0000000000000..3a02a0a281047
--- /dev/null
+++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSWithZK.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms.server;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.crypto.key.KeyProvider.Options;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
+import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
+import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
+import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URL;
+import java.security.Principal;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+public class TestKMSWithZK {
+
+ protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set(KMSConfiguration.KEY_PROVIDER_URI,
+ "jceks://file@" + new Path(keyStoreDir.getAbsolutePath(),
+ "kms.keystore").toUri());
+ conf.set("hadoop.kms.authentication.type", "simple");
+ conf.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
+
+ conf.set(KMSACLs.Type.GET_KEYS.getAclConfigKey(), "foo");
+ return conf;
+ }
+
+ @Test
+ public void testMultipleKMSInstancesWithZKSigner() throws Exception {
+ final File testDir = TestKMS.getTestDir();
+ Configuration conf = createBaseKMSConf(testDir);
+
+ TestingServer zkServer = new TestingServer();
+ zkServer.start();
+
+ MiniKMS kms1 = null;
+ MiniKMS kms2 = null;
+
+ conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
+ AuthenticationFilter.SIGNER_SECRET_PROVIDER, "zookeeper");
+ conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
+ ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
+ zkServer.getConnectString());
+ conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
+ ZKSignerSecretProvider.ZOOKEEPER_PATH, "/secret");
+ TestKMS.writeConf(testDir, conf);
+
+ try {
+ kms1 = new MiniKMS.Builder()
+ .setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
+ kms1.start();
+
+ kms2 = new MiniKMS.Builder()
+ .setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
+ kms2.start();
+
+ final URL url1 = new URL(kms1.getKMSUrl().toExternalForm() +
+ KMSRESTConstants.SERVICE_VERSION + "/" +
+ KMSRESTConstants.KEYS_NAMES_RESOURCE);
+ final URL url2 = new URL(kms2.getKMSUrl().toExternalForm() +
+ KMSRESTConstants.SERVICE_VERSION + "/" +
+ KMSRESTConstants.KEYS_NAMES_RESOURCE);
+
+ final DelegationTokenAuthenticatedURL.Token token =
+ new DelegationTokenAuthenticatedURL.Token();
+ final DelegationTokenAuthenticatedURL aUrl =
+ new DelegationTokenAuthenticatedURL();
+
+ UserGroupInformation ugiFoo = UserGroupInformation.createUserForTesting(
+ "foo", new String[]{"gfoo"});
+ UserGroupInformation ugiBar = UserGroupInformation.createUserForTesting(
+ "bar", new String[]{"gBar"});
+
+ ugiFoo.doAs(new PrivilegedExceptionAction