-
Notifications
You must be signed in to change notification settings - Fork 3k
Encryption integration and test #5544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Map<String, String> tableProperties = tableMetadata.properties(); | ||
| String fileFormat = | ||
| PropertyUtil.propertyAsString(tableProperties, DEFAULT_FILE_FORMAT, "parquet"); | ||
| if (!fileFormat.equals("parquet")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileFormat.fromString should be used to match the general usage pattern.
In other cases we accept upper case PARQUET too, so I think we should accept it here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, thanks for the suggestion.
| // Important: put catalog properties after table properties. Former overrides the latter. | ||
| encryptionProperties.putAll(catalogPropertyMap); | ||
|
|
||
| String tableKeyId = System.getProperty(ICEBERG_ENCRYPTION_TABLE_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems odd to me.
Do we rely on System properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a hack that allows to address a challenge of using different keys for different tables - in those cases where the table properties are not trusted. I'd say the purpose is somewhere between a "temporary solution" and a "discussion trigger" :). In the long run, we might have table properties safely kept in catalogs (eg the REST catalog), but we need a solution in a shorter term.
| * @param properties catalog properties | ||
| */ | ||
| default void initialize(Map<String, String> properties) {} | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing whitespace.
| fileIOImpl == null | ||
| ? new HadoopFileIO(conf) | ||
| : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); | ||
| this.encryptionManagerFactory = new DefaultEncryptionManagerFactory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intent for this to be a StandardEncryptionManagerFactory?
| ? new HadoopFileIO(conf) | ||
| : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); | ||
| this.encryptionManagerFactory = new DefaultEncryptionManagerFactory(); | ||
| this.encryptionManagerFactory.initialize(properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is typically used for dynamically loaded implementations. Since this is part of the catalog, I think it makes sense to just pass the properties to the encryption manager constructor.
6bfdf67 to
9902ef6
Compare
|
@ggershinsky: Do we want to add Flink tests alongside the Spark tests as well? |
|
The current PR sequence develops the encryption tools using Spark as a usecase. But once the toolset converges, we'll certainly need the Flink support and unitests. |
5d43ebf to
2a9ca87
Compare
| return encryptingFileIO; | ||
| } | ||
|
|
||
| encryptingFileIO = EncryptingFileIO.combine(fileIO, encryption()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe no need to do this if encryption() is PlaintextEncryptionManager.instance() . Then we won't have to fix the tests with Hadoop config cast.
| return encryptionManager; | ||
| } | ||
|
|
||
| if (keyManagementClient == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if tableKeyID is not null? check and throw an exception
| FileIO io = io(); | ||
| Preconditions.checkArgument( | ||
| io instanceof EncryptingFileIO, | ||
| "Cannot encrypt table metadata because the fileIO (%s) does not " | ||
| + "implement EncryptingFileIO", | ||
| io.getClass()); | ||
| EncryptingFileIO encryptingIO = (EncryptingFileIO) io(); | ||
| EncryptedOutputFile newEncryptedMetadataFile = | ||
| encryptingIO.newEncryptingOutputFile(newTableMetadataFilePath); | ||
|
|
||
| if (newEncryptedMetadataFile.keyMetadata() == null | ||
| || newEncryptedMetadataFile.keyMetadata().buffer() == null) { | ||
| throw new IllegalStateException("Null key metadata in encrypted table"); | ||
| } | ||
|
|
||
| newMetadataFile = newEncryptedMetadataFile.encryptingOutputFile(); | ||
| EncryptionManager encryptionManager = encryptingIO.encryptionManager(); | ||
|
|
||
| Preconditions.checkArgument( | ||
| encryptionManager instanceof StandardEncryptionManager, | ||
| "Cannot encrypt table metadata because the encryption manager (%s) does not " | ||
| + "implement StandardEncryptionManager", | ||
| encryptionManager.getClass()); | ||
| NativeEncryptionKeyMetadata keyMetadata = | ||
| (NativeEncryptionKeyMetadata) newEncryptedMetadataFile.keyMetadata(); | ||
| ByteBuffer metadataEncryptionKey = keyMetadata.encryptionKey(); | ||
| // Wrap (encrypt) metadata file key | ||
| ByteBuffer wrappedEncryptionKey = | ||
| ((StandardEncryptionManager) encryptionManager).wrapKey(metadataEncryptionKey); | ||
|
|
||
| ByteBuffer metadataAADPrefix = keyMetadata.aadPrefix(); | ||
| wrappedMetadataKey = | ||
| Base64.getEncoder() | ||
| .encodeToString( | ||
| EncryptionUtil.createKeyMetadata(wrappedEncryptionKey, metadataAADPrefix) | ||
| .buffer() | ||
| .array()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this logic into a util class (eg EncryptionUtil or TableMetadataParser), so it can be re-used by other catalogs.
| } | ||
|
|
||
| private void checkMetadataFileEncryption(InputFile file) throws IOException { | ||
| SeekableInputStream stream = file.newStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use try()
| HiveOperationsBase.validateTableIsIceberg(table, fullName); | ||
|
|
||
| metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); | ||
| // TODO do we need to lock/unlock Hive table, to get all 3 params in one atomic operation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need to lock but we do want to retry if we fail because we got non-synchronized values here.
Co-Authored-By: Jian Tang <[email protected]> rename to default encr mng factory test fix for: CTAS produces unencrypted files sync with design check file format in table props use new kms interface refactor common KMS client for unitests refactor update move factory to another PR update
57c1a4b to
3cbc0c2
Compare
|
|
||
| if (node.has(SNAPSHOT_KEK)) { | ||
| String wrappedKeyEncryptionKey = JsonUtil.getString(SNAPSHOT_KEK, node); | ||
| if (wrappedKeyEncryptionKey != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need in this check
| } | ||
|
|
||
| if (encryptionKeyId == null) { | ||
| encryptionPropsFromHms(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM now.
The previous approach (i.e. call the dekLength() in BaseMetastoreTableOperations) will only set encryptionDekLength on the write path (the writeNewMetadataFile method). Therefore, Hive Catalog readers get the default -1 value.
.palantir/revapi.yml
Outdated
| - code: "java.class.defaultSerializationChanged" | ||
| old: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| new: "class org.apache.iceberg.encryption.EncryptingFileIO" | ||
| justification: "Decrypting input mist receive length" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: mist
.palantir/revapi.yml
Outdated
| justification: "Decrypting input mist receive length" | ||
| - code: "java.method.removed" | ||
| old: "method org.apache.iceberg.io.InputFile org.apache.iceberg.encryption.EncryptingFileIO::newDecryptingInputFile(java.lang.String,\ | ||
| \ java.nio.ByteBuffer)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we want to remove this, but we can't make changes to API like this until a new major release. Can you please deprecate the method instead and make a note that it will be removed in 2.0.0?
|
|
||
| InputFile inputFile = io.newInputFile(path, length); | ||
|
|
||
| if (inputFile.getLength() != length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not call inputFile.getLength because it will either return the length passed in above and is useless (newInputFile(path, length)) or it will make a call to the underlying storage (needless HEAD request). Don't we already catch cases where the file has been truncated?
Hm. I don't see a test in TestGcmStreams so we should probably add one that validates truncated streams specifically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a test in TestGcmStreams so we should probably add one that validates truncated streams specifically.
I'm not sure how. If a file is truncated by exactly 1 block, GCM Streams won't detect that. The upper layer (Avro or Json readers, etc) might detect that or might not, it's not guaranteed. That's why we've added an explicit requirement in the GCM Stream spec to take the length from a trusted source. This is "out of band" from the spec point of view, meaning that we must make sure the length comes from a parent metadata (and not from the file system) everywhere in Iceberg where we decrypt a stream.
However, FileIO.newInputFile(path, length) implementations are custom; some of them simply ignore the length parameter - and then indeed send a file system request upon InputFile.getLength() call. But we can prevent a security breach by making this check (if (inputFile.getLength() != length)). In most cases, this won't trigger a HEAD request, because most of FileIO implementations don't ignore the length parameter in newInputFile(path, length) and store it. For those few that do ignore it, we need to verify the file length wasn't truncated in the file system.
| TableMetadata metadata, OutputFile outputFile, boolean overwrite) { | ||
| boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP; | ||
| OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary whitespace change.
|
|
||
| public static TableMetadata read(FileIO io, InputFile file) { | ||
| EncryptionManager encryption = | ||
| (io instanceof EncryptingFileIO) ? ((EncryptingFileIO) io).encryptionManager() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no need for most of the changes in this file after my recommendations on #7770.
| private String encryptionKeyId; | ||
| private int encryptionDekLength; | ||
|
|
||
| /** Tests only */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this comment.
| return snapshots; | ||
| } | ||
|
|
||
| public void setKekCache(Map<String, KeyEncryptionKey> kekCache) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change this to withKekCache, making the implementation similar to withUUID (via new Builder).
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
|
||
| if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { | ||
| this.keyManagementClient = EncryptionUtil.createKmsClient(properties); | ||
| this.writerKekTimeout = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Wanna keep this PR active
- (nit) Maybe it make sense to add unit to the variable name, i.e.
writerKekTimeoutMs? For example, inLockManagers, we have
this.heartbeatIntervalMs =
PropertyUtil.propertyAsLong(
properties,
CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS,
CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT);|
@ggershinsky is this PR still active? Could you please rebase and fix the merge conflicts? I'll review this afterwards |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
|
replaced with #13066 |
Initially, only for Hive catalogs