-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-17912. ABFS: Support for Encryption Context #3440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
9dc24eb
bd579f2
59d171f
538820e
09c0fbb
10adf42
84df4a2
7365d82
ffb4764
31f4c0f
af6cd1d
2ad36d3
0e19fd4
4dc1601
8998bc0
8357830
8035ed7
3bf75c3
fdc3149
a3559a3
99cf2bd
f6bb16f
ae6fce6
04a73e0
d96e08d
fb75454
ac6ff51
3caeb7b
a2fc26e
b840da3
72b5f2f
66eb217
1721d43
15a0ce8
2f2947c
16973d4
c27c28d
0e700ca
4164ee2
6c05dac
cd7d055
926e7d9
af5e90b
9505ed9
f3abbb9
a3d31f1
5842b28
1f1d9d1
4f86224
873b3fe
e8c1fb5
ae2d065
4f1e33b
8806ceb
ef092dd
fec93f4
c2d35d8
1e987a5
6364cfb
a1b1906
d35a276
450324a
183f1bc
d0a16bd
a69ee4a
061930b
00bd8c3
a642ef3
56f3efa
adc82f6
241fb9b
9daea68
2313790
aee1c7b
bb45ae9
3c6eddb
a231592
6ab375c
aaf1582
f9658e0
4002da6
1990a48
a2472d5
95b0f9d
0cd8c8c
b718b3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import java.io.IOException; | ||
| import java.lang.reflect.Field; | ||
|
|
||
| import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; | ||
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; | ||
|
|
||
|
|
@@ -906,6 +907,36 @@ public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemExceptio | |
| } | ||
| } | ||
|
|
||
| public EncryptionContextProvider initializeEncryptionContextProvider() { | ||
|
||
|
|
||
| try { | ||
| String configKey = FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE; | ||
| if (get(configKey) == null) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this config be strictly per account? Else it might cause problems with instantiating a file system with no intend to use encryption. It will try to create a provider for it anyway. Or am I missing something?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, keeping it account-agnostic would cause other accounts to instantiate EncryptionContextProvider (ECP) as well. Have made the provider config account-specific. However, each account that uses ECP will need to have its own account-specific entry in the config file |
||
| return null; | ||
| } | ||
| Class<? extends EncryptionContextProvider> encryptionContextClass = | ||
| getAccountSpecificClass(configKey, null, EncryptionContextProvider.class); | ||
| if (encryptionContextClass == null) { | ||
| encryptionContextClass = getAccountAgnosticClass(configKey, null, | ||
| EncryptionContextProvider.class); | ||
| } | ||
| Preconditions.checkArgument(encryptionContextClass != null, | ||
| String.format("The configuration value for %s is invalid.", configKey)); | ||
|
|
||
| EncryptionContextProvider encryptionContextProvider = | ||
| ReflectionUtils.newInstance(encryptionContextClass, rawConfig); | ||
| Preconditions.checkArgument(encryptionContextProvider != null, | ||
| String.format("Failed to initialize %s", encryptionContextClass)); | ||
|
|
||
| LOG.trace("Initializing {}", encryptionContextClass.getName()); | ||
| LOG.trace("{} init complete", encryptionContextClass.getName()); | ||
| return encryptionContextProvider; | ||
| } catch (Exception e) { | ||
| throw new IllegalArgumentException("Unable to load encryption context provider class: ", e); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| public int getReadAheadRange() { | ||
| return this.readAheadRange; | ||
| } | ||
|
|
@@ -1009,7 +1040,7 @@ public boolean enableAbfsListIterator() { | |
| } | ||
|
|
||
| public String getClientProvidedEncryptionKey() { | ||
| String accSpecEncKey = accountConf(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY); | ||
| String accSpecEncKey = accountConf(FS_AZURE_ENCRYPTION_CLIENT_PROVIDED_KEY); | ||
| return rawConfig.get(accSpecEncKey, null); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,9 @@ | |
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; | ||
| import org.apache.hadoop.fs.azurebfs.security.EncryptionAdapter; | ||
| import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; | ||
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; | ||
| import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; | ||
| import org.apache.hadoop.thirdparty.com.google.common.base.Strings; | ||
|
|
@@ -460,7 +463,7 @@ public void setFilesystemProperties( | |
| } | ||
|
|
||
| public Hashtable<String, String> getPathStatus(final Path path, | ||
| TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
| TracingContext tracingContext) throws IOException { | ||
| try (AbfsPerfInfo perfInfo = startTracking("getPathStatus", "getPathStatus")){ | ||
| LOG.debug("getPathStatus for filesystem: {} path: {}", | ||
| client.getFileSystem(), | ||
|
|
@@ -483,7 +486,7 @@ public Hashtable<String, String> getPathStatus(final Path path, | |
|
|
||
| public void setPathProperties(final Path path, | ||
| final Hashtable<String, String> properties, TracingContext tracingContext) | ||
| throws AzureBlobFileSystemException { | ||
| throws IOException { | ||
| try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){ | ||
| LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}", | ||
| client.getFileSystem(), | ||
|
|
@@ -554,13 +557,16 @@ public OutputStream createFile(final Path path, | |
| triggerConditionalCreateOverwrite = true; | ||
| } | ||
|
|
||
| EncryptionAdapter encryptionAdapter = new EncryptionAdapter( | ||
| client.getEncryptionContextProvider(), getRelativePath(path)); | ||
| AbfsRestOperation op; | ||
| if (triggerConditionalCreateOverwrite) { | ||
| op = conditionalCreateOverwriteFile(relativePath, | ||
| statistics, | ||
| isNamespaceEnabled ? getOctalNotation(permission) : null, | ||
| isNamespaceEnabled ? getOctalNotation(umask) : null, | ||
| isAppendBlob, | ||
| encryptionAdapter, | ||
| tracingContext | ||
| ); | ||
|
|
||
|
|
@@ -571,6 +577,7 @@ public OutputStream createFile(final Path path, | |
| isNamespaceEnabled ? getOctalNotation(umask) : null, | ||
| isAppendBlob, | ||
| null, | ||
| encryptionAdapter, | ||
| tracingContext); | ||
|
|
||
| } | ||
|
|
@@ -586,6 +593,7 @@ public OutputStream createFile(final Path path, | |
| statistics, | ||
| relativePath, | ||
| 0, | ||
| encryptionAdapter, | ||
| tracingContext)); | ||
| } | ||
| } | ||
|
|
@@ -606,15 +614,16 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |
| final String permission, | ||
| final String umask, | ||
| final boolean isAppendBlob, | ||
| TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
| EncryptionAdapter encryptionAdapter, | ||
| TracingContext tracingContext) throws IOException { | ||
| AbfsRestOperation op; | ||
|
|
||
| try { | ||
| // Trigger a create with overwrite=false first so that eTag fetch can be | ||
| // avoided for cases when no pre-existing file is present (major portion | ||
| // of create file traffic falls into the case of no pre-existing file). | ||
| op = client.createPath(relativePath, true, false, permission, umask, | ||
| isAppendBlob, null, tracingContext); | ||
| isAppendBlob, null, encryptionAdapter, tracingContext); | ||
|
|
||
| } catch (AbfsRestOperationException e) { | ||
| if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { | ||
|
|
@@ -639,7 +648,7 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa | |
| try { | ||
| // overwrite only if eTag matches with the file properties fetched befpre | ||
| op = client.createPath(relativePath, true, true, permission, umask, | ||
| isAppendBlob, eTag, tracingContext); | ||
| isAppendBlob, eTag, encryptionAdapter, tracingContext); | ||
| } catch (AbfsRestOperationException ex) { | ||
| if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { | ||
| // Is a parallel access case, as file with eTag was just queried | ||
|
|
@@ -682,6 +691,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( | |
| FileSystem.Statistics statistics, | ||
| String path, | ||
| long position, | ||
| EncryptionAdapter encryptionAdapter, | ||
| TracingContext tracingContext) { | ||
| int bufferSize = abfsConfiguration.getWriteBufferSize(); | ||
| if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { | ||
|
|
@@ -697,6 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( | |
| .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) | ||
| .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) | ||
| .withLease(lease) | ||
| .withEncryptionAdapter(encryptionAdapter) | ||
| .withBlockFactory(blockFactory) | ||
| .withBlockOutputActiveBlocks(blockOutputActiveBlocks) | ||
| .withClient(client) | ||
|
|
@@ -711,7 +722,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( | |
|
|
||
| public void createDirectory(final Path path, final FsPermission permission, | ||
| final FsPermission umask, TracingContext tracingContext) | ||
| throws AzureBlobFileSystemException { | ||
| throws IOException { | ||
| try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) { | ||
| boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); | ||
| LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}", | ||
|
|
@@ -726,8 +737,8 @@ public void createDirectory(final Path path, final FsPermission permission, | |
| final AbfsRestOperation op = client.createPath(getRelativePath(path), | ||
| false, overwrite, | ||
| isNamespaceEnabled ? getOctalNotation(permission) : null, | ||
| isNamespaceEnabled ? getOctalNotation(umask) : null, false, null, | ||
| tracingContext); | ||
| isNamespaceEnabled ? getOctalNotation(umask) : null, false, | ||
| null, null, tracingContext); | ||
| perfInfo.registerResult(op.getResult()).registerSuccess(true); | ||
| } | ||
| } | ||
|
|
@@ -753,7 +764,9 @@ public AbfsInputStream openFileForRead(Path path, | |
| String relativePath = getRelativePath(path); | ||
| String resourceType, eTag; | ||
| long contentLength; | ||
| if (fileStatus instanceof VersionedFileStatus) { | ||
| EncryptionAdapter encryptionAdapter = null; | ||
| if (fileStatus instanceof VersionedFileStatus | ||
| && client.getEncryptionType() != EncryptionType.ENCRYPTION_CONTEXT) { | ||
| path = path.makeQualified(this.uri, path); | ||
| Preconditions.checkArgument(fileStatus.getPath().equals(path), | ||
| String.format( | ||
|
|
@@ -764,9 +777,8 @@ public AbfsInputStream openFileForRead(Path path, | |
| eTag = ((VersionedFileStatus) fileStatus).getVersion(); | ||
| } else { | ||
| if (fileStatus != null) { | ||
| LOG.warn( | ||
| "Fallback to getPathStatus REST call as provided filestatus " | ||
| + "is not of type VersionedFileStatus"); | ||
| LOG.debug("Fallback to getPathStatus REST call as provided fileStatus " | ||
| + "is not of type VersionedFileStatus, or file is encrypted"); | ||
| } | ||
| AbfsHttpOperation op = client.getPathStatus(relativePath, false, | ||
| tracingContext).getResult(); | ||
|
|
@@ -775,6 +787,12 @@ public AbfsInputStream openFileForRead(Path path, | |
| contentLength = Long.parseLong( | ||
| op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); | ||
| eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); | ||
| if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { | ||
| encryptionAdapter = new EncryptionAdapter( | ||
| client.getEncryptionContextProvider(), getRelativePath(path), | ||
| op.getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT) | ||
| .getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| } | ||
|
|
||
| if (parseIsDirectory(resourceType)) { | ||
|
|
@@ -790,13 +808,13 @@ public AbfsInputStream openFileForRead(Path path, | |
| // Add statistics for InputStream | ||
| return new AbfsInputStream(client, statistics, relativePath, | ||
| contentLength, populateAbfsInputStreamContext( | ||
| parameters.map(OpenFileParameters::getOptions)), | ||
| parameters.map(OpenFileParameters::getOptions), encryptionAdapter), | ||
|
||
| eTag, tracingContext); | ||
| } | ||
| } | ||
|
|
||
| private AbfsInputStreamContext populateAbfsInputStreamContext( | ||
| Optional<Configuration> options) { | ||
| Optional<Configuration> options, EncryptionAdapter encryptionAdapter) { | ||
| boolean bufferedPreadDisabled = options | ||
| .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) | ||
| .orElse(false); | ||
|
|
@@ -812,6 +830,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( | |
| abfsConfiguration.shouldReadBufferSizeAlways()) | ||
| .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize()) | ||
| .withBufferedPreadDisabled(bufferedPreadDisabled) | ||
| .withEncryptionAdapter(encryptionAdapter) | ||
| .build(); | ||
| } | ||
|
|
||
|
|
@@ -851,6 +870,12 @@ public OutputStream openFileForWrite(final Path path, | |
| } | ||
|
|
||
| AbfsLease lease = maybeCreateLease(relativePath, tracingContext); | ||
| byte[] encryptionContext = op.getResult() | ||
| .getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT) | ||
| .getBytes(StandardCharsets.UTF_8); | ||
| EncryptionAdapter encryptionAdapter = new EncryptionAdapter( | ||
| client.getEncryptionContextProvider(), getRelativePath(path), | ||
| encryptionContext); | ||
|
|
||
| return new AbfsOutputStream( | ||
| populateAbfsOutputStreamContext( | ||
|
|
@@ -860,6 +885,7 @@ public OutputStream openFileForWrite(final Path path, | |
| statistics, | ||
| relativePath, | ||
| offset, | ||
| encryptionAdapter, | ||
| tracingContext)); | ||
| } | ||
| } | ||
|
|
@@ -877,8 +903,8 @@ public void breakLease(final Path path, final TracingContext tracingContext) thr | |
| client.breakLease(getRelativePath(path), tracingContext); | ||
| } | ||
|
|
||
| public void rename(final Path source, final Path destination, TracingContext tracingContext) throws | ||
| AzureBlobFileSystemException { | ||
| public void rename(final Path source, final Path destination, TracingContext tracingContext) | ||
| throws IOException { | ||
| final Instant startAggregate = abfsPerfTracker.getLatencyInstant(); | ||
| long countAggregate = 0; | ||
| boolean shouldContinue; | ||
|
|
@@ -1590,16 +1616,38 @@ private void initializeClient(URI uri, String fileSystemName, | |
| abfsConfiguration.getRawConfiguration()); | ||
| } | ||
|
|
||
| // Encryption setup | ||
| EncryptionType encryptionType = EncryptionType.NONE; | ||
| EncryptionContextProvider encryptionContextProvider = null; | ||
| if (isSecure) { | ||
| encryptionContextProvider = | ||
| abfsConfiguration.initializeEncryptionContextProvider(); | ||
| if (encryptionContextProvider != null) { | ||
| if (abfsConfiguration.getClientProvidedEncryptionKey() != null) { | ||
| throw new IOException( | ||
|
||
| "Both global key and encryption context are set, only one allowed"); | ||
| } | ||
| encryptionContextProvider.initialize( | ||
| abfsConfiguration.getRawConfiguration(), accountName, | ||
| fileSystemName); | ||
| encryptionType = EncryptionType.ENCRYPTION_CONTEXT; | ||
| } else if (abfsConfiguration.getClientProvidedEncryptionKey() != null) { | ||
| encryptionType = EncryptionType.GLOBAL_KEY; | ||
| } | ||
| } | ||
|
|
||
| LOG.trace("Initializing AbfsClient for {}", baseUrl); | ||
| if (tokenProvider != null) { | ||
| this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, | ||
| tokenProvider, | ||
| tokenProvider, encryptionContextProvider, | ||
| populateAbfsClientContext()); | ||
| } else { | ||
| this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, | ||
| sasTokenProvider, | ||
| sasTokenProvider, encryptionContextProvider, | ||
| populateAbfsClientContext()); | ||
| } | ||
| client.setEncryptionType(encryptionType); | ||
|
|
||
| LOG.trace("AbfsClient init complete"); | ||
| } | ||
|
|
||
|
|
@@ -1622,7 +1670,7 @@ private String getOctalNotation(FsPermission fsPermission) { | |
| return String.format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal()); | ||
| } | ||
|
|
||
| private String getRelativePath(final Path path) { | ||
| public String getRelativePath(final Path path) { | ||
| Preconditions.checkNotNull(path, "path"); | ||
| return path.toUri().getPath(); | ||
| } | ||
|
|
@@ -1640,7 +1688,8 @@ private boolean parseIsDirectory(final String resourceType) { | |
| && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); | ||
| } | ||
|
|
||
| private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws | ||
| public String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, | ||
|
||
| String> properties) throws | ||
| CharacterCodingException { | ||
| StringBuilder commaSeparatedProperties = new StringBuilder(); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move down to line 46
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have fixed it.