Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,11 @@ public boolean enableAbfsListIterator() {
return this.enableAbfsListIterator;
}

public String getClientProvidedEncryptionKey() {
String accSpecEncKey = accountConf(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY);
return rawConfig.get(accSpecEncKey, null);
}

@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key */
public static final String FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY = "fs.azure.client-provided-encryption-key";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dot as a delimiter has been the trend. Any reason to deviate ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not use the dot here. The dot is used to namespace. - is fine else make it one word


/** End point of ABFS account: {@value}. */
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public final class FileSystemConfigurations {
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;

public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";

public static final int MAX_CONCURRENT_READ_THREADS = 12;
public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
public static final String X_MS_ENCRYPTION_ALGORITHM = "x-ms-encryption-algorithm";
public static final String X_MS_REQUEST_SERVER_ENCRYPTED = "x-ms-request-server-encrypted";
public static final String X_MS_SERVER_ENCRYPTED = "x-ms-server-encrypted";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Locale;

import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
Expand All @@ -52,6 +57,7 @@

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
Expand All @@ -61,6 +67,7 @@
*/
public class AbfsClient implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);

private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private final String xMsVersion = "2019-12-12";
Expand All @@ -69,6 +76,8 @@ public class AbfsClient implements Closeable {
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
private final String clientProvidedEncryptionKey;
private final String clientProvidedEncryptionKeySHA;

private final String accountName;
private final AuthType authType;
Expand All @@ -78,7 +87,8 @@ public class AbfsClient implements Closeable {

private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext) {
final AbfsClientContext abfsClientContext)
throws IOException {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
Expand All @@ -88,6 +98,17 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);

String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
if (encryptionKey != null) {
this.clientProvidedEncryptionKey = getBase64EncodedString(encryptionKey);
this.clientProvidedEncryptionKeySHA = getBase64EncodedString(
getSHA256Hash(encryptionKey));
} else {
this.clientProvidedEncryptionKey = null;
this.clientProvidedEncryptionKeySHA = null;
}

String sslProviderName = null;

if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
Expand All @@ -111,19 +132,38 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final AbfsClientContext abfsClientContext) {
final AbfsClientContext abfsClientContext)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.tokenProvider = tokenProvider;
}

public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final AbfsClientContext abfsClientContext) {
final AbfsClientContext abfsClientContext)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.sasTokenProvider = sasTokenProvider;
}

private byte[] getSHA256Hash(String key) throws IOException {
try {
final MessageDigest digester = MessageDigest.getInstance("SHA-256");
return digester.digest(key.getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}
}

private String getBase64EncodedString(String key) {
return getBase64EncodedString(key.getBytes(StandardCharsets.UTF_8));
}

private String getBase64EncodedString(byte[] bytes) {
return Base64.getEncoder().encodeToString(bytes);
}

@Override
public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
Expand Down Expand Up @@ -159,6 +199,18 @@ List<AbfsHttpHeader> createDefaultHeaders() {
return requestHeaders;
}

private void addCustomerProvidedKeyHeaders(
final List<AbfsHttpHeader> requestHeaders) {
if (clientProvidedEncryptionKey != null) {
requestHeaders.add(
new AbfsHttpHeader(X_MS_ENCRYPTION_KEY, clientProvidedEncryptionKey));
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_KEY_SHA256,
clientProvidedEncryptionKeySHA));
requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_ALGORITHM,
SERVER_SIDE_ENCRYPTION_ALGORITHM));
}
}

AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
Expand Down Expand Up @@ -268,6 +320,9 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
final String permission, final String umask,
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (isFile) {
addCustomerProvidedKeyHeaders(requestHeaders);
}
if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
}
Expand Down Expand Up @@ -412,6 +467,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
AppendRequestParameters reqParams, final String cachedSasToken)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
Expand Down Expand Up @@ -495,6 +551,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r
boolean isClose, final String cachedSasToken)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
Expand Down Expand Up @@ -523,6 +580,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r
public AbfsRestOperation setPathProperties(final String path, final String properties)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
Expand Down Expand Up @@ -556,6 +614,8 @@ public AbfsRestOperation getPathStatus(final String path, final boolean includeP
// only traversal (execute) permission is required.
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
operation = SASTokenProvider.GET_STATUS_OPERATION;
} else {
addCustomerProvidedKeyHeaders(requestHeaders);
}
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
Expand All @@ -574,6 +634,7 @@ public AbfsRestOperation getPathStatus(final String path, final boolean includeP
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
addCustomerProvidedKeyHeaders(requestHeaders);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the purposes of debugging, add a log line if request fails and the sha256 of the key used in request and the server returned sha256 in response are different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server doesn't send back the actual sha of the encryption key with which the data is encrypted if the encryption key sent is wrong

requestHeaders.add(new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private String requestId = "";
private String expectedAppendPos = "";
private ListResultSchema listResultSchema = null;
private List<AbfsHttpHeader> responseHeaders;

// metrics
private int bytesSent;
Expand Down Expand Up @@ -339,9 +340,10 @@ public void processResponse(final byte[] buffer, final int offset, final int len
if (this.requestId == null) {
this.requestId = AbfsHttpConstants.EMPTY_STRING;
}
responseHeaders = AbfsIoUtils.getResponseHeaders(connection);
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
connection.getHeaderFields());
responseHeaders);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change required ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New method is introduced to print response headers which simply accepts the list of response headers


if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) {
// If it is HEAD, and it is ERROR
Expand Down Expand Up @@ -558,6 +560,10 @@ public String getSignatureMaskedEncodedUrl() {
return this.maskedEncodedUrl;
}

public List<AbfsHttpHeader> getResponseHeaders() {
return this.responseHeaders;
}

public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
/**
* Creates an instance to represent fixed results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -67,4 +69,42 @@ public static void dumpHeadersToDebugLog(final String origin,
}
}
}

public static void dumpHeadersToDebugLog(final String origin,
final List<AbfsHttpHeader> headers) {
if (headers == null || headers.size() < 1) {
return;
}
LOG.debug("{}", origin);
for (AbfsHttpHeader header : headers) {
String key = header.getName();
String value = header.getValue();
if (key == null) {
key = "HTTP Response";
}
if (key.contains("Cookie")) {
value = "*cookie info*";
}
if (key.equals("sig")) {
value = "XXXX";
}
LOG.debug(" {}={}", key, value);
}
}

public static List<AbfsHttpHeader> getResponseHeaders(
final HttpURLConnection connection) {
final Map<String, List<String>> headers = connection.getHeaderFields();
final List<AbfsHttpHeader> responseHeaders = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
String key = entry.getKey();
if (key == null) {
key = "HTTP Response";
}
String values = StringUtils.join(";", entry.getValue());
responseHeaders.add(new AbfsHttpHeader(key, values));
}
return responseHeaders;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.HttpException;

import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;

/**
* The AbfsRestOperation for Rest AbfsClient.
*/
Expand All @@ -50,6 +52,7 @@ public class AbfsRestOperation {
private final URL url;
// all the custom HTTP request headers provided by the caller
private final List<AbfsHttpHeader> requestHeaders;
private List<AbfsHttpHeader> responseHeaders;

// This is a simple operation class, where all the upload methods have a
// request body and all the download methods have a response body.
Expand Down Expand Up @@ -287,6 +290,10 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS

return false;
} finally {
if (httpOperation != null) {
this.responseHeaders = httpOperation.getResponseHeaders();

}
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
}

Expand All @@ -301,6 +308,28 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
return true;
}

private boolean isCPKShaMismatch() {
String shaSent = getHeader(requestHeaders, X_MS_ENCRYPTION_KEY_SHA256);
String shaReceived = getHeader(responseHeaders, X_MS_ENCRYPTION_KEY_SHA256);
boolean isCPKShaMismatch = !shaSent.equalsIgnoreCase(shaReceived);
if (isCPKShaMismatch) {
LOG.debug("The value sent and received for {} is different. Value "
+ "sent: {}, value received: {}", X_MS_ENCRYPTION_KEY_SHA256,
shaSent,
shaReceived);
}
return isCPKShaMismatch;
}

private String getHeader(List<AbfsHttpHeader> headers, String headerName){
for(AbfsHttpHeader header : headers){
if(header.getName().equalsIgnoreCase(headerName)){
return header.getValue();
}
}
return "";
}

/**
* Incrementing Abfs counters with a long value.
*
Expand All @@ -312,4 +341,8 @@ private void incrementCounter(AbfsStatistic statistic, long value) {
abfsCounters.incrementCounter(statistic, value);
}
}

public List<AbfsHttpHeader> getResponseHeaders() {
return this.responseHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void loadConfiguredFileSystem() throws Exception {
this.fileSystemName = authorityParts[0];

// Reset URL with configured filesystem
final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
final String abfsUrl = this.getFileSystemName() + "@" + authorityParts[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change required ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was buggy. authorityParts[1] is the actual FS to be loaded, getAccountName returns the FS that is created dynamically.

URI defaultUri = null;

defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
Expand Down
Loading