Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import javax.security.sasl.Sasl;

import org.apache.commons.codec.binary.Base64;
Expand All @@ -52,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.slf4j.Logger;
Expand Down Expand Up @@ -204,6 +206,26 @@ public static SaslPropertiesResolver getSaslPropertiesResolver(
return resolver;
}

private static <T> T readSaslMessage(InputStream in,
Function<DataTransferEncryptorMessageProto, ? extends T> handler) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
switch (proto.getStatus()) {
case ERROR_UNKNOWN_KEY:
throw new InvalidEncryptionKeyException(proto.getMessage());
case ERROR:
if (proto.hasAccessTokenError() && proto.getAccessTokenError()) {
throw new InvalidBlockTokenException(proto.getMessage());
}
throw new IOException(proto.getMessage());
case SUCCESS:
return handler.apply(proto);
default:
throw new IOException(
"Unknown status: " + proto.getStatus() + ", message: " + proto.getMessage());
}
}

/**
* Reads a SASL negotiation message.
*
Expand All @@ -212,15 +234,7 @@ public static SaslPropertiesResolver getSaslPropertiesResolver(
* @throws IOException for any error
*/
public static byte[] readSaslMessage(InputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return proto.getPayload().toByteArray();
}
return readSaslMessage(in, proto -> proto.getPayload().toByteArray());
}

/**
Expand All @@ -233,21 +247,15 @@ public static byte[] readSaslMessage(InputStream in) throws IOException {
*/
public static byte[] readSaslMessageAndNegotiationCipherOptions(
InputStream in, List<CipherOption> cipherOptions) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return readSaslMessage(in, proto -> {
List<CipherOptionProto> optionProtos = proto.getCipherOptionList();
if (optionProtos != null) {
for (CipherOptionProto optionProto : optionProtos) {
cipherOptions.add(PBHelperClient.convert(optionProto));
}
}
return proto.getPayload().toByteArray();
}
});
}

static class SaslMessageWithHandshake {
Expand Down Expand Up @@ -276,13 +284,7 @@ String getBpid() {

public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
InputStream in) throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return readSaslMessage(in, proto -> {
byte[] payload = proto.getPayload().toByteArray();
byte[] secret = null;
String bpid = null;
Expand All @@ -292,7 +294,7 @@ public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
bpid = handshakeSecret.getBpid();
}
return new SaslMessageWithHandshake(payload, secret, bpid);
}
});
}

/**
Expand Down Expand Up @@ -467,13 +469,7 @@ public static void sendSaslMessageAndNegotiationCipherOptions(
public static SaslResponseWithNegotiatedCipherOption
readSaslMessageAndNegotiatedCipherOption(InputStream in)
throws IOException {
DataTransferEncryptorMessageProto proto =
DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
throw new InvalidEncryptionKeyException(proto.getMessage());
} else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
throw new IOException(proto.getMessage());
} else {
return readSaslMessage(in, proto -> {
byte[] response = proto.getPayload().toByteArray();
List<CipherOption> options = PBHelperClient.convertCipherOptionProtos(
proto.getCipherOptionList());
Expand All @@ -482,7 +478,7 @@ public static void sendSaslMessageAndNegotiationCipherOptions(
option = options.get(0);
}
return new SaslResponseWithNegotiatedCipherOption(response, option);
}
});
}

/**
Expand Down Expand Up @@ -558,6 +554,13 @@ public static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message,
HandshakeSecretProto handshakeSecret)
throws IOException {
sendSaslMessage(out, status, payload, message, handshakeSecret, false);
}

public static void sendSaslMessage(OutputStream out,
DataTransferEncryptorStatus status, byte[] payload, String message,
HandshakeSecretProto handshakeSecret, boolean accessTokenError)
throws IOException {
DataTransferEncryptorMessageProto.Builder builder =
DataTransferEncryptorMessageProto.newBuilder();

Expand All @@ -571,6 +574,9 @@ public static void sendSaslMessage(OutputStream out,
if (handshakeSecret != null) {
builder.setHandshakeSecret(handshakeSecret);
}
if (accessTokenError) {
builder.setAccessTokenError(true);
}

DataTransferEncryptorMessageProto proto = builder.build();
proto.writeDelimitedTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,11 +588,11 @@ private IOStreamPair doSaslHandshake(InetAddress addr,
// the client accepts some cipher suites, but the server does not.
LOG.debug("Client accepts cipher suites {}, "
+ "but server {} does not accept any of them",
cipherSuites, addr.toString());
cipherSuites, addr);
}
} else {
LOG.debug("Client using cipher suite {} with server {}",
cipherOption.getCipherSuite().getName(), addr.toString());
cipherOption.getCipherSuite().getName(), addr);
}
}
}
Expand All @@ -603,7 +603,20 @@ private IOStreamPair doSaslHandshake(InetAddress addr,
conf, cipherOption, underlyingOut, underlyingIn, false) :
sasl.createStreamPair(out, in);
} catch (IOException ioe) {
sendGenericSaslErrorMessage(out, ioe.getMessage());
String message = ioe.getMessage();
try {
sendGenericSaslErrorMessage(out, message);
} catch (Exception e) {
// If ioe is caused by error response from server, server will close peer connection.
// So sendGenericSaslErrorMessage might cause IOException due to "Broken pipe".
// We suppress IOException from sendGenericSaslErrorMessage
// and always throw `ioe` as top level.
// `ioe` can be InvalidEncryptionKeyException or InvalidBlockTokenException
// that indicates refresh key or token and are important for caller.
LOG.debug("Failed to send generic sasl error to server {} (message: {}), "
+ "suppress exception", addr, message, e);
ioe.addSuppressed(e);
}
throw ioe;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message DataTransferEncryptorMessageProto {
optional string message = 3;
repeated CipherOptionProto cipherOption = 4;
optional HandshakeSecretProto handshakeSecret = 5;
optional bool accessTokenError = 6;
}

message HandshakeSecretProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -441,6 +443,14 @@ private IOStreamPair doSaslHandshake(Peer peer, OutputStream underlyingOut,
// error, the client will get a new encryption key from the NN and retry
// connecting to this DN.
sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
} else if (ioe instanceof SaslException &&
ioe.getCause() != null &&
(ioe.getCause() instanceof InvalidBlockTokenException ||
ioe.getCause() instanceof SecretManager.InvalidToken)) {
// This could be because the client is long-lived and block token is expired
// The client will get new block token from the NN, upon receiving this error
// and retry connecting to this DN
sendInvalidTokenSaslErrorMessage(out, ioe.getCause().getMessage());
} else {
sendGenericSaslErrorMessage(out, ioe.getMessage());
}
Expand All @@ -460,4 +470,16 @@ private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
message);
}

/**
* Sends a SASL negotiation message indicating an invalid token error.
*
* @param out stream to receive message
* @param message to send
* @throws IOException for any error
*/
private static void sendInvalidTokenSaslErrorMessage(DataOutputStream out,
String message) throws IOException {
sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message, null, true);
}
}
Loading