diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 8929c86d55ae..acd1866ba59b 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1177,6 +1177,10 @@ acceptedBreaks: old: "class org.apache.iceberg.Metrics" new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.encryption.EncryptingFileIO" + new: "class org.apache.iceberg.encryption.EncryptingFileIO" + justification: "New method for Manifest List reading" org.apache.iceberg:iceberg-core: - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ diff --git a/api/src/main/java/org/apache/iceberg/ManifestListFile.java b/api/src/main/java/org/apache/iceberg/ManifestListFile.java new file mode 100644 index 000000000000..e727a35a4e09 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/ManifestListFile.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; + +public interface ManifestListFile { + + /** Location of manifest list file. */ + String location(); + + /** The manifest list key metadata can be encrypted. Returns ID of encryption key */ + String encryptionKeyID(); + + /** Decrypt and return the manifest list key metadata */ + ByteBuffer decryptKeyMetadata(EncryptionManager em); +} diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java index d3de7b1f84a3..a4c708570dba 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java @@ -28,6 +28,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -108,13 +109,21 @@ public InputFile newInputFile(ManifestFile manifest) { } } + @Override + public InputFile newInputFile(ManifestListFile manifestList) { + if (manifestList.encryptionKeyID() != null) { + ByteBuffer keyMetadata = manifestList.decryptKeyMetadata(em); + return newDecryptingInputFile(manifestList.location(), keyMetadata); + } else { + return newInputFile(manifestList.location()); + } + } + public InputFile newDecryptingInputFile(String path, ByteBuffer buffer) { return em.decrypt(wrap(io.newInputFile(path), buffer)); } public InputFile newDecryptingInputFile(String path, long length, ByteBuffer buffer) { - // TODO: is the length correct for the encrypted file? It may be the length of the plaintext - // stream return em.decrypt(wrap(io.newInputFile(path, length), buffer)); } diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java b/api/src/main/java/org/apache/iceberg/io/FileIO.java index f5404b9e5a78..78b61f60be6b 100644 --- a/api/src/main/java/org/apache/iceberg/io/FileIO.java +++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java @@ -24,6 +24,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -70,6 +71,15 @@ default InputFile newInputFile(ManifestFile manifest) { return newInputFile(manifest.path(), manifest.length()); } + default InputFile newInputFile(ManifestListFile manifestList) { + Preconditions.checkArgument( + manifestList.encryptionKeyID() == null, + "Cannot decrypt manifest list: %s (use EncryptingFileIO)", + manifestList.location()); + // cannot pass length because it is not tracked outside of key metadata + return newInputFile(manifestList.location()); + } + /** Get a {@link OutputFile} instance to write bytes to the file at the given path. */ OutputFile newOutputFile(String path); diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java new file mode 100644 index 000000000000..e0ecfd50c863 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BaseManifestListFile.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; + +class BaseManifestListFile implements ManifestListFile, Serializable { + private final String location; + private final String encryptionKeyID; + + BaseManifestListFile(String location, String encryptionKeyID) { + this.location = location; + this.encryptionKeyID = encryptionKeyID; + } + + @Override + public String location() { + return location; + } + + @Override + public String encryptionKeyID() { + return encryptionKeyID; + } + + @Override + public ByteBuffer decryptKeyMetadata(EncryptionManager em) { + return EncryptionUtil.decryptManifestListKeyMetadata(this, em); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index b97b15d65221..28a45d2c7821 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -182,7 +182,9 @@ private void cacheManifests(FileIO fileIO) { if (allManifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation)); + this.allManifests = + ManifestLists.read( + fileIO.newInputFile(new BaseManifestListFile(manifestListLocation, keyId))); } if (dataManifests == null || deleteManifests == null) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index c2cb1bf8c85d..a7644d3bef2a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -21,6 +21,10 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -29,9 +33,25 @@ abstract class ManifestListWriter implements FileAppender { private final FileAppender writer; + private final StandardEncryptionManager standardEncryptionManager; + private final NativeEncryptionKeyMetadata manifestListKeyMetadata; + private final OutputFile outputFile; + + private ManifestListWriter( + OutputFile file, EncryptionManager encryptionManager, Map meta) { + if (encryptionManager instanceof StandardEncryptionManager) { + // ability to encrypt the manifest list key is introduced for standard encryption. + this.standardEncryptionManager = (StandardEncryptionManager) encryptionManager; + NativeEncryptionOutputFile encryptedFile = this.standardEncryptionManager.encrypt(file); + this.outputFile = encryptedFile.encryptingOutputFile(); + this.manifestListKeyMetadata = encryptedFile.keyMetadata(); + } else { + this.standardEncryptionManager = null; + this.outputFile = file; + this.manifestListKeyMetadata = null; + } - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + this.writer = newAppender(outputFile, meta); } protected abstract ManifestFile prepare(ManifestFile manifest); @@ -73,18 +93,31 @@ public Long nextRowId() { return null; } + public ManifestListFile toManifestListFile() { + if (manifestListKeyMetadata != null && manifestListKeyMetadata.encryptionKey() != null) { + manifestListKeyMetadata.copyWithLength(writer.length()); + String manifestListKeyID = + standardEncryptionManager.addManifestListKeyMetadata(manifestListKeyMetadata); + return new BaseManifestListFile(outputFile.location(), manifestListKeyID); + } else { + return new BaseManifestListFile(outputFile.location(), null); + } + } + static class V3Writer extends ManifestListWriter { private final V3Metadata.ManifestFileWrapper wrapper; private Long nextRowId; V3Writer( OutputFile snapshotFile, + EncryptionManager encryptionManager, long snapshotId, Long parentSnapshotId, long sequenceNumber, long firstRowId) { super( snapshotFile, + encryptionManager, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), @@ -134,15 +167,22 @@ public Long nextRowId() { static class V2Writer extends ManifestListWriter { private final V2Metadata.ManifestFileWrapper wrapper; - V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V2Writer( + OutputFile snapshotFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber) { super( snapshotFile, + encryptionManager, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), "format-version", "2")); this.wrapper = new V2Metadata.ManifestFileWrapper(snapshotId, sequenceNumber); + // todo encryption only in v3? throw exception if e.manager is not plaintext? } @Override @@ -170,13 +210,19 @@ protected FileAppender newAppender(OutputFile file, Map read(InputFile manifestList) { } } + // or should we modify all related tests (to pass PlaintextEncryptionManager)? + @VisibleForTesting static ManifestListWriter write( int formatVersion, OutputFile manifestListFile, @@ -54,19 +59,43 @@ static ManifestListWriter write( Long parentSnapshotId, long sequenceNumber, Long firstRowId) { + return write( + formatVersion, + manifestListFile, + PlaintextEncryptionManager.instance(), + snapshotId, + parentSnapshotId, + sequenceNumber, + firstRowId); + } + + static ManifestListWriter write( + int formatVersion, + OutputFile manifestListFile, + EncryptionManager encryptionManager, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + Long firstRowId) { switch (formatVersion) { case 1: Preconditions.checkArgument( sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, "Invalid sequence number for v1 manifest list: %s", sequenceNumber); - return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + return new ManifestListWriter.V1Writer( + manifestListFile, encryptionManager, snapshotId, parentSnapshotId); case 2: return new ManifestListWriter.V2Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + manifestListFile, encryptionManager, snapshotId, parentSnapshotId, sequenceNumber); case 3: return new ManifestListWriter.V3Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, firstRowId); + manifestListFile, + encryptionManager, + snapshotId, + parentSnapshotId, + sequenceNumber, + firstRowId); } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 95064759ebe9..06293964ca39 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; @@ -38,7 +40,7 @@ public abstract class ManifestWriter> implements FileAp static final long UNASSIGNED_SEQ = -1L; private final OutputFile file; - private final ByteBuffer keyMetadataBuffer; + private final EncryptionKeyMetadata keyMetadata; private final int specId; private final FileAppender> writer; private final Long snapshotId; @@ -65,7 +67,7 @@ private ManifestWriter( new GenericManifestEntry<>(V1Metadata.entrySchema(spec.partitionType()).asStruct()); this.stats = new PartitionSummary(spec); this.firstRowId = firstRowId; - this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer(); + this.keyMetadata = file.keyMetadata(); } protected abstract ManifestEntry prepare(ManifestEntry entry); @@ -192,6 +194,18 @@ public long length() { public ManifestFile toManifestFile() { Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed"); + + // if key metadata can store the length, add it + ByteBuffer keyMetadataBuffer; + if (keyMetadata instanceof NativeEncryptionKeyMetadata) { + keyMetadataBuffer = + ((NativeEncryptionKeyMetadata) keyMetadata).copyWithLength(length()).buffer(); + } else if (keyMetadata != null) { + keyMetadataBuffer = keyMetadata.buffer(); + } else { + keyMetadataBuffer = null; + } + // if the minSequenceNumber is null, then no manifests with a sequence number have been written, // so the min data sequence number is the one that will be assigned when this is committed. // pass UNASSIGNED_SEQ to inherit it. diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java index bd57c6277528..8cb32e395d20 100644 --- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java +++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java @@ -30,6 +30,9 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -248,10 +251,16 @@ public static RewriteResult rewriteManifestList( mf.path(), sourcePrefix)); + EncryptionManager encryptionManager = + (io instanceof EncryptingFileIO) + ? ((EncryptingFileIO) io).encryptionManager() + : PlaintextEncryptionManager.instance(); + try (FileAppender writer = ManifestLists.write( tableMetadata.formatVersion(), outputFile, + encryptionManager, snapshot.snapshotId(), snapshot.parentId(), snapshot.sequenceNumber(), diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 118ae0b328a5..6cb7d0b00d83 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -266,6 +266,7 @@ public Snapshot apply() { ManifestLists.write( ops.current().formatVersion(), manifestList, + ops.encryption(), snapshotId(), parentSnapshotId, sequenceNumber, @@ -323,7 +324,7 @@ public Snapshot apply() { manifestList.location(), nextRowId, assignedRows, - null); + writer.toManifestListFile().encryptionKeyID()); } protected abstract Map summary(); @@ -799,4 +800,37 @@ private static void updateTotal( } } } + + /** + * A wrapper to set the dataSequenceNumber of a DeleteFile. + * + * @deprecated will be removed in 1.10.0; use {@link Delegates#pendingDeleteFile(DeleteFile, + * Long)} instead. + */ + @Deprecated + protected static class PendingDeleteFile extends Delegates.PendingDeleteFile { + /** + * Wrap a delete file for commit with a given data sequence number. + * + * @param deleteFile delete file + * @param dataSequenceNumber data sequence number to apply + */ + PendingDeleteFile(DeleteFile deleteFile, long dataSequenceNumber) { + super(deleteFile, dataSequenceNumber); + } + + /** + * Wrap a delete file for commit with the latest sequence number. + * + * @param deleteFile delete file + */ + PendingDeleteFile(DeleteFile deleteFile) { + super(deleteFile, null); + } + + @Override + PendingDeleteFile wrap(DeleteFile file) { + return new PendingDeleteFile(file, dataSequenceNumber()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java index a43643fcc779..b03944859b6e 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java +++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputFile.java @@ -26,20 +26,34 @@ public class AesGcmInputFile implements InputFile { private final InputFile sourceFile; private final byte[] dataKey; private final byte[] fileAADPrefix; - private long plaintextLength; + private Long encryptedLength; + private Long plaintextLength; public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix) { + this(sourceFile, dataKey, fileAADPrefix, null); + } + + public AesGcmInputFile(InputFile sourceFile, byte[] dataKey, byte[] fileAADPrefix, Long length) { this.sourceFile = sourceFile; this.dataKey = dataKey; this.fileAADPrefix = fileAADPrefix; - this.plaintextLength = -1; + this.encryptedLength = length; + this.plaintextLength = null; + } + + private long encryptedLength() { + if (encryptedLength == null) { + this.encryptedLength = sourceFile.getLength(); + } + + return encryptedLength; } @Override public long getLength() { - if (plaintextLength == -1) { + if (plaintextLength == null) { // Presumes all streams use hard-coded plaintext block size. - plaintextLength = AesGcmInputStream.calculatePlaintextLength(sourceFile.getLength()); + plaintextLength = AesGcmInputStream.calculatePlaintextLength(encryptedLength()); } return plaintextLength; @@ -47,7 +61,7 @@ public long getLength() { @Override public SeekableInputStream newStream() { - long ciphertextLength = sourceFile.getLength(); + long ciphertextLength = encryptedLength(); Preconditions.checkState( ciphertextLength >= Ciphers.MIN_STREAM_LENGTH, "Invalid encrypted stream: %d is shorter than the minimum possible stream length", diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 922cac455d3d..359d439e6f95 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.encryption; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; public class EncryptionUtil { @@ -83,7 +87,7 @@ public static EncryptionManager createEncryptionManager( return createEncryptionManager(List.of(), tableProperties, kmsClient); } - static EncryptionManager createEncryptionManager( + public static EncryptionManager createEncryptionManager( List keys, Map tableProperties, KeyManagementClient kmsClient) { Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); @@ -104,10 +108,59 @@ static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + return new StandardEncryptionManager(keys, tableKeyId, dataKeyLength, kmsClient); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); } + + /** + * Decrypt the key metadata for a manifest list. + * + * @param manifestList a ManifestListFile + * @param em the table's EncryptionManager + * @return a decrypted key metadata buffer + */ + public static ByteBuffer decryptManifestListKeyMetadata( + ManifestListFile manifestList, EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Snapshot key metadata encryption requires a StandardEncryptionManager"); + StandardEncryptionManager sem = (StandardEncryptionManager) em; + String manifestListKeyId = manifestList.encryptionKeyID(); + ByteBuffer keyEncryptionKey = sem.encryptedByKey(manifestListKeyId); + ByteBuffer encryptedKeyMetadata = sem.encryptedKeyMetadata(manifestListKeyId); + + Ciphers.AesGcmDecryptor decryptor = + new Ciphers.AesGcmDecryptor(ByteBuffers.toByteArray(keyEncryptionKey)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(encryptedKeyMetadata); + byte[] decryptedKeyMetadata = + decryptor.decrypt(keyMetadataBytes, manifestListKeyId.getBytes(StandardCharsets.UTF_8)); + return ByteBuffer.wrap(decryptedKeyMetadata); + } + + /** + * Encrypts the key metadata for a manifest list. + * + * @param key key encryption key bytes + * @param keyId ID of the manifest list key + * @param keyMetadata manifest list key metadata + * @return encrypted key metadata + */ + static ByteBuffer encryptManifestListKeyMetadata( + ByteBuffer key, String keyId, EncryptionKeyMetadata keyMetadata) { + Ciphers.AesGcmEncryptor encryptor = new Ciphers.AesGcmEncryptor(ByteBuffers.toByteArray(key)); + byte[] keyMetadataBytes = ByteBuffers.toByteArray(keyMetadata.buffer()); + byte[] encryptedKeyMetadata = + encryptor.encrypt(keyMetadataBytes, keyId.getBytes(StandardCharsets.UTF_8)); + return ByteBuffer.wrap(encryptedKeyMetadata); + } + + public static Map encryptionKeys(EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Encryption keys are only available for StandardEncryptionManager"); + return ((StandardEncryptionManager) em).encryptionKeys(); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java index a7fb494cc8e1..6f834c69ed86 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java @@ -24,7 +24,7 @@ import java.util.Map; /** A minimum client interface to connect to a key management service (KMS). */ -interface KeyManagementClient extends Serializable, Closeable { +public interface KeyManagementClient extends Serializable, Closeable { /** * Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID. diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java index c2ed9d564d1e..2188378a4e87 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -27,4 +27,21 @@ public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { /** Additional authentication data as a {@link ByteBuffer} */ ByteBuffer aadPrefix(); + + /** Encrypted file length */ + default Long fileLength() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement fileLength"); + } + + /** + * Copy this key metadata and set the file length. + * + * @param length length of the encrypted file in bytes + * @return a copy of this key metadata (key and AAD) with the file length + */ + default NativeEncryptionKeyMetadata copyWithLength(long length) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement copyWithLength"); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 119d2a5f9ae2..180114021ceb 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -18,30 +18,80 @@ */ package org.apache.iceberg.encryption; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import java.nio.ByteBuffer; import java.security.SecureRandom; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.ByteBuffers; public class StandardEncryptionManager implements EncryptionManager { - private final transient KeyManagementClient kmsClient; + private static final String KEY_ENCRYPTION_KEY_ID = "KEY_ENCRYPTION_KEY_ID"; + private final String tableKeyId; private final int dataKeyLength; + // a holder class of metadata that is not available after serialization + private class TransientEncryptionState { + private final KeyManagementClient kmsClient; + private final Map encryptionKeys; + private final LoadingCache unwrappedKeyCache; + + private TransientEncryptionState(List keys, KeyManagementClient kmsClient) { + this.kmsClient = kmsClient; + this.encryptionKeys = Maps.newLinkedHashMap(); + + for (EncryptedKey key : keys) { + Preconditions.checkArgument( + key.keyId() != null, "Key id cannot be null"); // Required by spec. + encryptionKeys.put(key.keyId(), key); + } + + this.unwrappedKeyCache = + Caffeine.newBuilder() + .expireAfterWrite(1, TimeUnit.HOURS) + .build( + keyId -> + kmsClient.unwrapKey( + encryptionKeys.get(keyId).encryptedKeyMetadata(), tableKeyId)); + } + } + + private final transient TransientEncryptionState transientState; + private transient volatile SecureRandom lazyRNG = null; /** + * @deprecated will be removed in 1.11.0; use {@link #StandardEncryptionManager(List, String, int, + * KeyManagementClient)} instead. + */ + @Deprecated + public StandardEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + this(List.of(), tableKeyId, dataKeyLength, kmsClient); + } + + /** + * @param keys a list of existing {@link EncryptedKey}s for this {@link EncryptionManager} to use * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption */ public StandardEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + List keys, + String tableKeyId, + int dataKeyLength, + KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -49,7 +99,7 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.kmsClient = kmsClient; + this.transientState = new TransientEncryptionState(keys, kmsClient); this.dataKeyLength = dataKeyLength; } @@ -81,22 +131,114 @@ private SecureRandom workerRNG() { return lazyRNG; } + /** + * @deprecated will be removed in 1.11.0. + */ + @Deprecated public ByteBuffer wrapKey(ByteBuffer secretKey) { - if (kmsClient == null) { + if (transientState == null) { throw new IllegalStateException( "Cannot wrap key after called after serialization (missing KMS client)"); } - return kmsClient.wrapKey(secretKey, tableKeyId); + return transientState.kmsClient.wrapKey(secretKey, tableKeyId); } + /** + * @deprecated will be removed in 1.11.0. + */ + @Deprecated public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - if (kmsClient == null) { + if (transientState == null) { + throw new IllegalStateException("Cannot unwrap key after serialization"); + } + + return transientState.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + } + + private String keyEncryptionKeyID() { + if (transientState == null) { + throw new IllegalStateException("Cannot return the current key after serialization"); + } + + if (!transientState.encryptionKeys.containsKey(KEY_ENCRYPTION_KEY_ID)) { + ByteBuffer unwrapped = newKey(); + ByteBuffer wrapped = transientState.kmsClient.wrapKey(unwrapped, tableKeyId); + EncryptedKey key = new BaseEncryptedKey(KEY_ENCRYPTION_KEY_ID, wrapped, tableKeyId, null); + + // update internal tracking + transientState.unwrappedKeyCache.put(key.keyId(), unwrapped); + transientState.encryptionKeys.put(key.keyId(), key); + } + + return KEY_ENCRYPTION_KEY_ID; + } + + ByteBuffer encryptedByKey(String manifestListKeyID) { + if (transientState == null) { + throw new IllegalStateException("Cannot find key encryption key after serialization"); + } + + EncryptedKey encryptedKeyMetadata = transientState.encryptionKeys.get(manifestListKeyID); + if (encryptedKeyMetadata == null) { throw new IllegalStateException( - "Cannot wrap key after called after serialization (missing KMS client)"); + "Cannot find manifest list key metadata with id " + manifestListKeyID); + } + + return transientState.unwrappedKeyCache.get(encryptedKeyMetadata.encryptedById()); + } + + ByteBuffer encryptedKeyMetadata(String manifestListKeyID) { + if (transientState == null) { + throw new IllegalStateException("Cannot find encrypted key metadata after serialization"); + } + + EncryptedKey encryptedKeyMetadata = transientState.encryptionKeys.get(manifestListKeyID); + if (encryptedKeyMetadata == null) { + throw new IllegalStateException( + "Cannot find manifest list key metadata with id " + manifestListKeyID); + } + + return encryptedKeyMetadata.encryptedKeyMetadata(); + } + + public String addManifestListKeyMetadata(NativeEncryptionKeyMetadata keyMetadata) { + if (transientState == null) { + throw new IllegalStateException("Cannot add key metadata after serialization"); + } + + String manifestListKeyID = generateKeyId(); + ByteBuffer encryptedKeyMetadata = + EncryptionUtil.encryptManifestListKeyMetadata( + transientState.unwrappedKeyCache.get(keyEncryptionKeyID()), + manifestListKeyID, + keyMetadata); + BaseEncryptedKey key = + new BaseEncryptedKey(manifestListKeyID, encryptedKeyMetadata, keyEncryptionKeyID(), null); + + transientState.encryptionKeys.put(key.keyId(), key); + + return manifestListKeyID; + } + + public Map encryptionKeys() { + if (transientState == null) { + throw new IllegalStateException("Cannot return encryption keys after serialization"); } - return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + return transientState.encryptionKeys; + } + + private String generateKeyId() { + byte[] idBytes = new byte[16]; + workerRNG().nextBytes(idBytes); + return Base64.getEncoder().encodeToString(idBytes); + } + + private ByteBuffer newKey() { + byte[] newKey = new byte[dataKeyLength]; + workerRNG().nextBytes(newKey); + return ByteBuffer.wrap(newKey); } private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { @@ -173,7 +315,8 @@ private AesGcmInputFile decrypted() { new AesGcmInputFile( encryptedInputFile.encryptedInputFile(), ByteBuffers.toByteArray(keyMetadata().encryptionKey()), - ByteBuffers.toByteArray(keyMetadata().aadPrefix())); + ByteBuffers.toByteArray(keyMetadata().aadPrefix()), + keyMetadata().fileLength()); } return lazyDecryptedInputFile; diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 98f87c65d95f..6ddea184d8c4 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -36,7 +36,8 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private static final Schema SCHEMA_V1 = new Schema( required(0, "encryption_key", Types.BinaryType.get()), - optional(1, "aad_prefix", Types.BinaryType.get())); + optional(1, "aad_prefix", Types.BinaryType.get()), + optional(2, "file_length", Types.LongType.get())); private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = AvroSchemaUtil.convert(SCHEMA_V1, StandardKeyMetadata.class.getCanonicalName()); @@ -49,20 +50,31 @@ class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord private ByteBuffer encryptionKey; private ByteBuffer aadPrefix; - private org.apache.avro.Schema avroSchema; + private Long fileLength; /** Used by Avro reflection to instantiate this class * */ StandardKeyMetadata() {} StandardKeyMetadata(byte[] key, byte[] aad) { + this(key, aad, null); + } + + StandardKeyMetadata(byte[] key, byte[] aad, Long fileLength) { this.encryptionKey = ByteBuffer.wrap(key); this.aadPrefix = ByteBuffer.wrap(aad); + this.fileLength = fileLength; } - private StandardKeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { - this.encryptionKey = encryptionKey; - this.aadPrefix = aadPrefix; - this.avroSchema = AVRO_SCHEMA_V1; + /** + * Copy constructor. + * + * @param toCopy a StandardKeymetadata to copy + * @param fileLength file length that overrides toCopy if not null + */ + private StandardKeyMetadata(StandardKeyMetadata toCopy, Long fileLength) { + this.encryptionKey = toCopy.encryptionKey; + this.aadPrefix = toCopy.aadPrefix; + this.fileLength = fileLength != null ? fileLength : toCopy.fileLength; } static Map supportedSchemaVersions() { @@ -83,6 +95,11 @@ public ByteBuffer aadPrefix() { return aadPrefix; } + @Override + public Long fileLength() { + return fileLength; + } + static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { if (keyMetadata instanceof StandardKeyMetadata) { return (StandardKeyMetadata) keyMetadata; @@ -116,7 +133,12 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { - return new StandardKeyMetadata(encryptionKey(), aadPrefix()); + return new StandardKeyMetadata(this, null); + } + + @Override + public NativeEncryptionKeyMetadata copyWithLength(long length) { + return new StandardKeyMetadata(this, length); } @Override @@ -128,6 +150,9 @@ public void put(int i, Object v) { case 1: this.aadPrefix = (ByteBuffer) v; return; + case 2: + this.fileLength = (Long) v; + return; default: // ignore the object, it must be from a newer version of the format } @@ -140,6 +165,8 @@ public Object get(int i) { return encryptionKey; case 1: return aadPrefix; + case 2: + return fileLength; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); } @@ -147,6 +174,6 @@ public Object get(int i) { @Override public org.apache.avro.Schema getSchema() { - return avroSchema; + return AVRO_SCHEMA_V1; } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index edcbc5229362..85ecff083f8e 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -47,6 +47,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -158,6 +160,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Integer pageSize = null; private CloseableGroup closeables = null; private Set endpoints; + private KeyManagementClient kmsClient = null; enum SnapshotMode { ALL, @@ -251,6 +254,9 @@ public void initialize(String name, Map unresolved) { this.reportingViaRestEnabled = PropertyUtil.propertyAsBoolean(mergedProps, REST_METRICS_REPORTING_ENABLED, true); + if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.kmsClient = EncryptionUtil.createKmsClient(mergedProps); + } super.initialize(name, mergedProps); } @@ -446,6 +452,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { paths.table(finalIdentifier), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, tableMetadata, endpoints); @@ -525,6 +532,7 @@ public Table registerTable( paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, response.tableMetadata(), endpoints); @@ -784,6 +792,7 @@ public Table create() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, response.tableMetadata(), endpoints); @@ -811,6 +820,7 @@ public Transaction createTransaction() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta, @@ -874,6 +884,7 @@ public Transaction replaceTransaction() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, RESTTableOperations.UpdateType.REPLACE, changes.build(), base, diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 5f6c28b32337..bd12d88f77ec 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -31,12 +31,19 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; @@ -55,20 +62,37 @@ enum UpdateType { private final String path; private final Supplier> headers; private final FileIO io; + private final KeyManagementClient kmsClient; private final List createChanges; private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String encryptionKeyId; + private int encryptionDekLength; + private List encryptedKeysFromMetadata; + RESTTableOperations( RESTClient client, String path, Supplier> headers, FileIO io, + KeyManagementClient kmsClient, TableMetadata current, Set endpoints) { - this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints); + this( + client, + path, + headers, + io, + kmsClient, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); } RESTTableOperations( @@ -76,6 +100,7 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + KeyManagementClient kmsClient, UpdateType updateType, List createChanges, TableMetadata current, @@ -84,6 +109,7 @@ enum UpdateType { this.path = path; this.headers = headers; this.io = io; + this.kmsClient = kmsClient; this.updateType = updateType; this.createChanges = createChanges; this.replaceBase = current; @@ -93,6 +119,10 @@ enum UpdateType { this.current = current; } this.endpoints = endpoints; + + // N.B. We don't use this.current because for tables-to-be-created, because it would be null, + // and ee still want encrypted properties in this case for its TableOperations. + encryptionPropsFromMetadata(current); } @Override @@ -113,6 +143,18 @@ public void commit(TableMetadata base, TableMetadata metadata) { Consumer errorHandler; List requirements; List updates; + + TableMetadata metadataToCommit = metadata; + if (encryption() instanceof StandardEncryptionManager) { + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Map.Entry entry : + EncryptionUtil.encryptionKeys(encryption()).entrySet()) { + builder.addEncryptionKey(entry.getValue()); + } + metadataToCommit = builder.build(); + // TODO(smaheshwar): Think about requirements. + } + switch (updateType) { case CREATE: Preconditions.checkState( @@ -120,7 +162,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { updates = ImmutableList.builder() .addAll(createChanges) - .addAll(metadata.changes()) + .addAll(metadataToCommit.changes()) .build(); requirements = UpdateRequirements.forCreateTable(updates); errorHandler = ErrorHandlers.tableErrorHandler(); // throws NoSuchTableException @@ -131,7 +173,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { updates = ImmutableList.builder() .addAll(createChanges) - .addAll(metadata.changes()) + .addAll(metadataToCommit.changes()) .build(); // use the original replace base metadata because the transaction will refresh requirements = UpdateRequirements.forReplaceTable(replaceBase, updates); @@ -140,7 +182,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { case SIMPLE: Preconditions.checkState(base != null, "Invalid base metadata: null"); - updates = metadata.changes(); + updates = metadataToCommit.changes(); requirements = UpdateRequirements.forUpdateTable(base, updates); errorHandler = ErrorHandlers.tableCommitHandler(); break; @@ -166,7 +208,67 @@ public void commit(TableMetadata base, TableMetadata metadata) { @Override public FileIO io() { - return io; + if (encryptionKeyId == null) { + return io; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(io, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (encryptionKeyId != null) { + if (kmsClient == null) { + throw new RuntimeException( + "Cant create encryption manager, because key management client is not set"); + } + + Map tableProperties = Maps.newHashMap(); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, encryptionKeyId); + tableProperties.put( + TableProperties.ENCRYPTION_DEK_LENGTH, String.valueOf(encryptionDekLength)); + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeysFromMetadata, tableProperties, kmsClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; + } + + private void encryptionPropsFromMetadata(TableMetadata metadata) { + // TODO(smaheshwar): Check generally for changed encryption-related properties! + if (metadata == null || metadata.properties() == null) { + return; + } + + encryptedKeysFromMetadata = metadata.encryptionKeys(); + + Map tableProperties = metadata.properties(); + if (encryptionKeyId == null) { + encryptionKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + } + + if (encryptionKeyId != null && encryptionDekLength <= 0) { + String dekLength = tableProperties.get(TableProperties.ENCRYPTION_DEK_LENGTH); + encryptionDekLength = + (dekLength == null) + ? TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT + : Integer.parseInt(dekLength); + } + + // Force re-creation of encryptingFileIO and encryptionManager + encryptingFileIO = null; + encryptionManager = null; } private TableMetadata updateCurrentMetadata(LoadTableResponse response) { @@ -175,6 +277,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) { // safely ignored. there is no requirement to update config on refresh or commit. if (current == null || !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { + encryptionPropsFromMetadata(response.tableMetadata()); this.current = response.tableMetadata(); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java new file mode 100644 index 000000000000..8682e56d4a2f --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionTestHelpers; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestManifestListEncryption { + private static final String PATH = "s3://bucket/table/m1.avro"; + private static final long LENGTH = 1024L; + private static final int SPEC_ID = 1; + private static final long SEQ_NUM = 34L; + private static final long MIN_SEQ_NUM = 10L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final int ADDED_FILES = 2; + private static final long ADDED_ROWS = 5292L; + private static final int EXISTING_FILES = 343; + private static final long EXISTING_ROWS = 857273L; + private static final int DELETED_FILES = 1; + private static final long DELETED_ROWS = 22910L; + private static final long FIRST_ROW_ID = 100L; + private static final long SNAPSHOT_FIRST_ROW_ID = 130L; + + private static final ByteBuffer FIRST_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 10); + private static final ByteBuffer FIRST_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 100); + private static final ByteBuffer SECOND_SUMMARY_LOWER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 20); + private static final ByteBuffer SECOND_SUMMARY_UPPER_BOUND = + Conversions.toByteBuffer(Types.IntegerType.get(), 200); + + private static final List PARTITION_SUMMARIES = + Lists.newArrayList( + new GenericPartitionFieldSummary( + false, FIRST_SUMMARY_LOWER_BOUND, FIRST_SUMMARY_UPPER_BOUND), + new GenericPartitionFieldSummary( + true, false, SECOND_SUMMARY_LOWER_BOUND, SECOND_SUMMARY_UPPER_BOUND)); + private static final ByteBuffer MANIFEST_KEY_METADATA = ByteBuffer.allocate(100); + + private static final ManifestFile TEST_MANIFEST = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + PARTITION_SUMMARIES, + MANIFEST_KEY_METADATA, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + FIRST_ROW_ID); + + private static final EncryptionManager ENCRYPTION_MANAGER = + EncryptionTestHelpers.createEncryptionManager(); + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeAndReadEncryptedManifestList(); + + assertThat(manifest.path()).isEqualTo(PATH); + assertThat(manifest.length()).isEqualTo(LENGTH); + assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM); + assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM); + assertThat((long) manifest.snapshotId()).isEqualTo(SNAPSHOT_ID); + assertThat((int) manifest.addedFilesCount()).isEqualTo(ADDED_FILES); + assertThat((long) manifest.addedRowsCount()).isEqualTo(ADDED_ROWS); + assertThat((int) manifest.existingFilesCount()).isEqualTo(EXISTING_FILES); + assertThat((long) manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS); + assertThat((int) manifest.deletedFilesCount()).isEqualTo(DELETED_FILES); + assertThat((long) manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS); + assertThat(manifest.content()).isEqualTo(ManifestContent.DATA); + } + + private ManifestFile writeAndReadEncryptedManifestList() throws IOException { + FileIO io = new InMemoryFileIO(); + EncryptingFileIO encryptingFileIO = EncryptingFileIO.combine(io, ENCRYPTION_MANAGER); + OutputFile outputFile = io.newOutputFile("memory:" + UUID.randomUUID()); + + ManifestListWriter writer = + ManifestLists.write( + 3, + outputFile, + encryptingFileIO.encryptionManager(), + SNAPSHOT_ID, + SNAPSHOT_ID - 1, + SEQ_NUM, + SNAPSHOT_FIRST_ROW_ID); + writer.add(TEST_MANIFEST); + writer.close(); + ManifestListFile manifestListFile = writer.toManifestListFile(); + + // First try to read without decryption + assertThatThrownBy(() -> ManifestLists.read(outputFile.toInputFile())) + .isInstanceOf(RuntimeIOException.class) + .hasMessageContaining("Failed to open file") + .hasCauseInstanceOf(InvalidAvroMagicException.class); + + List manifests = + ManifestLists.read(encryptingFileIO.newInputFile(manifestListFile)); + assertThat(manifests.size()).isEqualTo(1); + + return manifests.get(0); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index b7f60920a6a5..926667d9476e 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -59,7 +59,6 @@ public void testJsonConversion() throws IOException { Snapshot snapshot = SnapshotParser.fromJson(json); assertThat(snapshot.snapshotId()).isEqualTo(expected.snapshotId()); - assertThat(snapshot.allManifests(ops.io())).isEqualTo(expected.allManifests(ops.io())); assertThat(snapshot.operation()).isNull(); assertThat(snapshot.summary()).isNull(); assertThat(snapshot.schemaId()).isEqualTo(1); diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java index 6d4be7671157..901f8080ff1e 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -34,7 +34,6 @@ public static EncryptionManager createEncryptionManager() { CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); Map tableProperties = Maps.newHashMap(); tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); - tableProperties.put(TableProperties.FORMAT_VERSION, "2"); return EncryptionUtil.createEncryptionManager( List.of(), tableProperties, EncryptionUtil.createKmsClient(catalogProperties)); diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index a107a72ce63c..bd10f476f9d8 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -37,6 +37,7 @@ import org.apache.iceberg.GenericStatisticsFile; import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; @@ -197,6 +198,9 @@ private static FileIO createMockFileIO(FileIO wrapped) { .thenAnswer( invocation -> wrapped.newInputFile(invocation.getArgument(0), invocation.getArgument(1))); + Mockito.when(mockIO.newInputFile(Mockito.any(ManifestListFile.class))) + .thenAnswer( + invocation -> wrapped.newInputFile((ManifestListFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(ManifestFile.class))) .thenAnswer(invocation -> wrapped.newInputFile((ManifestFile) invocation.getArgument(0))); Mockito.when(mockIO.newInputFile(Mockito.any(DataFile.class))) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java new file mode 100644 index 000000000000..b3ca0f7e7dee --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRestTableEncryption.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.Ciphers; +import org.apache.iceberg.encryption.UnitestKMS; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.types.Types; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; + +// TODO(smaheshwar): This test is taken from https://github.com/apache/iceberg/pull/13066, with the +// exception of testCtas, but adapted for the REST catalog. When that merges, we can directly use +// those tests for the REST catalog as well by adding to the parameters method there, to have a +// single test class for table encryption. +public class TestRestTableEncryption extends CatalogTestBase { + private static Map appendCatalogEncryptionProperties(Map props) { + Map newProps = Maps.newHashMap(); + newProps.putAll(props); + newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); + return newProps; + } + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) + } + }; + } + + @BeforeEach + public void createTables() { + sql( + "CREATE TABLE %s (id bigint, data string, float float) USING iceberg " + + "TBLPROPERTIES ( " + + "'encryption.key-id'='%s')", + tableName, UnitestKMS.MASTER_KEY_NAME1); + + sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", tableName); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testSelect() { + List expected = + ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", Float.NaN)); + + assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", tableName)); + } + + @TestTemplate + public void testInsertAndDelete() { + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', float('NaN'))", tableName); + + List expected = + ImmutableList.of( + row(1L, "a", 1.0F), + row(2L, "b", 2.0F), + row(3L, "c", Float.NaN), + row(4L, "d", 4.0F), + row(5L, "e", 5.0F), + row(6L, "f", Float.NaN)); + + assertEquals( + "Should return all expected rows", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("DELETE FROM %s WHERE id < 4", tableName); + + expected = ImmutableList.of(row(4L, "d", 4.0F), row(5L, "e", 5.0F), row(6L, "f", Float.NaN)); + + assertEquals( + "Should return all expected rows", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Disabled("We don't yet check changes to encryption properties") + @TestTemplate + public void testKeyDelete() { + assertThatThrownBy( + () -> sql("ALTER TABLE %s UNSET TBLPROPERTIES (`encryption.key-id`)", tableName)) + .hasMessageContaining("Cannot remove key in encrypted table"); + } + + @TestTemplate + public void testDirectDataFileRead() { + List dataFileTable = + sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.ALL_DATA_FILES); + List dataFiles = + Streams.concat(dataFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (dataFiles.isEmpty()) { + throw new RuntimeException("No data files found for table " + tableName); + } + + Schema schema = new Schema(optional(0, "id", Types.IntegerType.get())); + for (String filePath : dataFiles) { + assertThatThrownBy( + () -> + Parquet.read(localInput(filePath)) + .project(schema) + .callInit() + .build() + .iterator() + .next()) + .isInstanceOf(ParquetCryptoRuntimeException.class) + .hasMessageContaining("Trying to read file with encrypted footer. No keys available"); + } + } + + @TestTemplate + public void testManifestEncryption() throws IOException { + List manifestFileTable = + sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS); + + List manifestFiles = + Streams.concat(manifestFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (manifestFiles.isEmpty()) { + throw new RuntimeException("No manifest files found for table " + tableName); + } + + String metadataFolderPath = null; + + // Check encryption of manifest files + for (String manifestFilePath : manifestFiles) { + checkMetadataFileEncryption(localInput(manifestFilePath)); + + if (metadataFolderPath == null) { + metadataFolderPath = new File(manifestFilePath).getParent().replaceFirst("file:", ""); + } + } + + if (metadataFolderPath == null) { + throw new RuntimeException("No metadata folder found for table " + tableName); + } + + // Find manifest list and metadata files; check their encryption + File[] listOfMetadataFiles = new File(metadataFolderPath).listFiles(); + boolean foundManifestListFile = false; + + for (File metadataFile : listOfMetadataFiles) { + if (metadataFile.getName().startsWith("snap-")) { + foundManifestListFile = true; + checkMetadataFileEncryption(localInput(metadataFile)); + } + } + + if (!foundManifestListFile) { + throw new RuntimeException("No manifest list files found for table " + tableName); + } + } + + @TestTemplate + public void testCtas() { + String tableName = this.tableName + "_ctas"; + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES ( " + + "'encryption.key-id'='%s') AS SELECT * FROM VALUES (1, 'a', 1.0), (2, 'b', 2.0)" + + " AS t(id, data, float)", + tableName, UnitestKMS.MASTER_KEY_NAME1); + + assertThat(sql("SELECT * FROM %s", tableName).size()).isEqualTo(2); + } + + private void checkMetadataFileEncryption(InputFile file) throws IOException { + SeekableInputStream stream = file.newStream(); + byte[] magic = new byte[4]; + stream.read(magic); + stream.close(); + assertThat(magic).isEqualTo(Ciphers.GCM_STREAM_MAGIC_STRING.getBytes(StandardCharsets.UTF_8)); + } +}