diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java index 0ca69aad194d..aa86cd27f0aa 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java @@ -20,12 +20,16 @@ package org.apache.iceberg.aws.s3; import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.metrics.MetricsContext; import software.amazon.awssdk.services.s3.S3Client; -public class S3InputFile extends BaseS3File implements InputFile { +public class S3InputFile extends BaseS3File implements InputFile, NativelyEncryptedFile { + private NativeFileCryptoParameters nativeDecryptionParameters; + public static S3InputFile fromLocation(String location, S3Client client, AwsProperties awsProperties, MetricsContext metrics) { return new S3InputFile(client, new S3URI(location), awsProperties, metrics); @@ -49,4 +53,14 @@ public long getLength() { public SeekableInputStream newStream() { return new S3InputStream(client(), uri(), awsProperties(), metrics()); } + + @Override + public NativeFileCryptoParameters nativeCryptoParameters() { + return nativeDecryptionParameters; + } + + @Override + public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) { + this.nativeDecryptionParameters = nativeCryptoParameters; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java index 48b2ee4f0cae..a8bb3b927bd8 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.UncheckedIOException; import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -29,7 +31,9 @@ import org.apache.iceberg.metrics.MetricsContext; import software.amazon.awssdk.services.s3.S3Client; -public class S3OutputFile extends BaseS3File implements OutputFile { +public class S3OutputFile extends BaseS3File implements OutputFile, NativelyEncryptedFile { + private NativeFileCryptoParameters nativeEncryptionParameters; + public static S3OutputFile fromLocation(String location, S3Client client, AwsProperties awsProperties, MetricsContext metrics) { return new S3OutputFile(client, new S3URI(location), awsProperties, metrics); @@ -67,4 +71,14 @@ public PositionOutputStream createOrOverwrite() { public InputFile toInputFile() { return new S3InputFile(client(), uri(), awsProperties(), metrics()); } + + @Override + public NativeFileCryptoParameters nativeCryptoParameters() { + return nativeEncryptionParameters; + } + + @Override + public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) { + this.nativeEncryptionParameters = nativeCryptoParameters; + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/Ciphers.java b/core/src/main/java/org/apache/iceberg/encryption/Ciphers.java new file mode 100644 index 000000000000..11c09543f4ee --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/Ciphers.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.security.GeneralSecurityException; +import java.security.SecureRandom; +import javax.crypto.AEADBadTagException; +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class Ciphers { + private static final int NONCE_LENGTH = 12; + private static final int GCM_TAG_LENGTH = 16; + private static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH; + + public static class AesGcmEncryptor { + private final SecretKeySpec aesKey; + private final Cipher cipher; + private final SecureRandom randomGenerator; + + public AesGcmEncryptor(byte[] keyBytes) { + Preconditions.checkArgument(keyBytes != null, "Key can't be null"); + int keyLength = keyBytes.length; + Preconditions.checkArgument((keyLength == 16 || keyLength == 24 || keyLength == 32), + "Cannot use a key of length " + keyLength + " because AES only allows 16, 24 or 32 bytes"); + this.aesKey = new SecretKeySpec(keyBytes, "AES"); + + try { + this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); + } catch (GeneralSecurityException e) { + throw new RuntimeException("Failed to create GCM cipher", e); + } + + this.randomGenerator = new SecureRandom(); + } + + public byte[] encrypt(byte[] plainText, byte[] aad) { + byte[] nonce = new byte[NONCE_LENGTH]; + randomGenerator.nextBytes(nonce); + int cipherTextLength = NONCE_LENGTH + plainText.length + GCM_TAG_LENGTH; + byte[] cipherText = new byte[cipherTextLength]; + + try { + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce); + cipher.init(Cipher.ENCRYPT_MODE, aesKey, spec); + if (null != aad) { + cipher.updateAAD(aad); + } + cipher.doFinal(plainText, 0, plainText.length, cipherText, NONCE_LENGTH); + } catch (GeneralSecurityException e) { + throw new RuntimeException("Failed to encrypt", e); + } + + // Add the nonce + System.arraycopy(nonce, 0, cipherText, 0, NONCE_LENGTH); + + return cipherText; + } + } + + public static class AesGcmDecryptor { + private final SecretKeySpec aesKey; + private final Cipher cipher; + + public AesGcmDecryptor(byte[] keyBytes) { + Preconditions.checkArgument(keyBytes != null, "Key can't be null"); + int keyLength = keyBytes.length; + Preconditions.checkArgument((keyLength == 16 || keyLength == 24 || keyLength == 32), + "Cannot use a key of length " + keyLength + " because AES only allows 16, 24 or 32 bytes"); + this.aesKey = new SecretKeySpec(keyBytes, "AES"); + + try { + this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); + } catch (GeneralSecurityException e) { + throw new RuntimeException("Failed to create GCM cipher", e); + } + } + + public byte[] decrypt(byte[] ciphertext, byte[] aad) { + int plainTextLength = ciphertext.length - GCM_TAG_LENGTH - NONCE_LENGTH; + Preconditions.checkState(plainTextLength >= 1, + "Cannot decrypt cipher text of length " + ciphertext.length + + " because text must longer than GCM_TAG_LENGTH + NONCE_LENGTH bytes. Text may not be encrypted" + + " with AES GCM cipher"); + + // Get the nonce from ciphertext + byte[] nonce = new byte[NONCE_LENGTH]; + System.arraycopy(ciphertext, 0, nonce, 0, NONCE_LENGTH); + + byte[] plainText = new byte[plainTextLength]; + int inputLength = ciphertext.length - NONCE_LENGTH; + try { + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce); + cipher.init(Cipher.DECRYPT_MODE, aesKey, spec); + if (null != aad) { + cipher.updateAAD(aad); + } + cipher.doFinal(ciphertext, NONCE_LENGTH, inputLength, plainText, 0); + } catch (AEADBadTagException e) { + throw new RuntimeException("GCM tag check failed. Possible reasons: wrong decryption key; or corrupt/tampered" + + "data. AES GCM doesn't differentiate between these two.. ", e); + } catch (GeneralSecurityException e) { + throw new RuntimeException("Failed to decrypt", e); + } + + return plainText; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionAlgorithm.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionAlgorithm.java new file mode 100644 index 000000000000..650958c5a3b7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionAlgorithm.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +/** + * Algorithm supported for file encryption. + */ +public enum EncryptionAlgorithm { + /** + * Counter mode (CTR) allows fast encryption with high throughput. + * It is an encryption only cipher and does not ensure content integrity. + * Inputs to CTR cipher are: + * 1. encryption key + * 2. a 16-byte initialization vector (12-byte nonce, 4-byte counter) + * 3. plaintext data + */ + AES_CTR, + /** + * Galois/Counter mode (GCM) combines CTR with the new Galois mode of authentication. + * It not only ensures data confidentiality, but also ensures data integrity. + * Inputs to GCM cipher are: + * 1. encryption key + * 2. a 12-byte initialization vector + * 3. additional authenticated data + * 4. plaintext data + */ + AES_GCM, + /** + * A combination of GCM and CTR that can be used for file types like Parquet, + * so that all modules except pages are encrypted by GCM to ensure integrity, + * and CTR is used for efficient encryption of bulk data. + * The tradeoff is that attackers would be able to tamper page data encrypted with CTR. + */ + AES_GCM_CTR +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeFileCryptoParameters.java b/core/src/main/java/org/apache/iceberg/encryption/NativeFileCryptoParameters.java new file mode 100644 index 000000000000..c19ab2fcd759 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeFileCryptoParameters.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Barebone encryption parameters, one object per content file. + * Carries the file encryption key (later, will be extended with column keys and AAD prefix). + * Applicable only to formats with native encryption support (Parquet and ORC). + */ +public class NativeFileCryptoParameters { + private ByteBuffer fileKey; + private EncryptionAlgorithm fileEncryptionAlgorithm; + + private NativeFileCryptoParameters(ByteBuffer fileKey, EncryptionAlgorithm fileEncryptionAlgorithm) { + Preconditions.checkState(fileKey != null, "File encryption key is not supplied"); + this.fileKey = fileKey; + this.fileEncryptionAlgorithm = fileEncryptionAlgorithm; + } + + /** + * Creates the builder. + * + * @param fileKey per-file encryption key. For example, used as "footer key" DEK in Parquet encryption. + */ + public static Builder create(ByteBuffer fileKey) { + return new Builder(fileKey); + } + + public static class Builder { + private ByteBuffer fileKey; + private EncryptionAlgorithm fileEncryptionAlgorithm; + + private Builder(ByteBuffer fileKey) { + this.fileKey = fileKey; + } + + public Builder encryptionAlgorithm(EncryptionAlgorithm encryptionAlgorithm) { + this.fileEncryptionAlgorithm = encryptionAlgorithm; + return this; + } + + public NativeFileCryptoParameters build() { + return new NativeFileCryptoParameters(fileKey, fileEncryptionAlgorithm); + } + } + + public ByteBuffer fileKey() { + return fileKey; + } + + public EncryptionAlgorithm encryptionAlgorithm() { + return fileEncryptionAlgorithm; + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativelyEncryptedFile.java b/core/src/main/java/org/apache/iceberg/encryption/NativelyEncryptedFile.java new file mode 100644 index 000000000000..2e0f403397dc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativelyEncryptedFile.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +/** + * This interface is applied to OutputFile and InputFile implementations, in order to enable delivery of crypto + * parameters (such as encryption keys etc) from the Iceberg key management module to the writers/readers of file + * formats that support encryption natively (Parquet and ORC). + */ +public interface NativelyEncryptedFile { + NativeFileCryptoParameters nativeCryptoParameters(); + + void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters); +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java index 7cf6dee60b82..7393c91ce32b 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.InputFile; @@ -40,7 +42,7 @@ *

* This class is based on Parquet's HadoopInputFile. */ -public class HadoopInputFile implements InputFile { +public class HadoopInputFile implements InputFile, NativelyEncryptedFile { public static final String[] NO_LOCATION_PREFERENCE = new String[0]; private final String location; @@ -49,6 +51,7 @@ public class HadoopInputFile implements InputFile { private final Configuration conf; private FileStatus stat = null; private Long length = null; + private NativeFileCryptoParameters nativeDecryptionParameters; public static HadoopInputFile fromLocation(CharSequence location, Configuration conf) { FileSystem fs = Util.getFs(new Path(location.toString()), conf); @@ -224,6 +227,16 @@ public boolean exists() { } } + @Override + public NativeFileCryptoParameters nativeCryptoParameters() { + return nativeDecryptionParameters; + } + + @Override + public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) { + this.nativeDecryptionParameters = nativeCryptoParameters; + } + @Override public String toString() { return path.toString(); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java index 921a7628e8f5..764725de5d0c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopOutputFile.java @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.InputFile; @@ -33,11 +35,12 @@ /** * {@link OutputFile} implementation using the Hadoop {@link FileSystem} API. */ -public class HadoopOutputFile implements OutputFile { +public class HadoopOutputFile implements OutputFile, NativelyEncryptedFile { private final FileSystem fs; private final Path path; private final Configuration conf; + private NativeFileCryptoParameters nativeEncryptionParameters; public static OutputFile fromLocation(CharSequence location, Configuration conf) { Path path = new Path(location.toString()); @@ -114,4 +117,14 @@ public InputFile toInputFile() { public String toString() { return location(); } + + @Override + public NativeFileCryptoParameters nativeCryptoParameters() { + return nativeEncryptionParameters; + } + + @Override + public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) { + this.nativeEncryptionParameters = nativeCryptoParameters; + } } diff --git a/core/src/test/java/org/apache/iceberg/encryption/TestCiphers.java b/core/src/test/java/org/apache/iceberg/encryption/TestCiphers.java new file mode 100644 index 000000000000..26aaeae0d486 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/encryption/TestCiphers.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.nio.charset.StandardCharsets; +import java.security.SecureRandom; +import org.junit.Assert; +import org.junit.Test; + +public class TestCiphers { + + @Test + public void testBasicEncrypt() { + testEncryptDecrypt(null); + } + + @Test + public void testAAD() { + byte[] aad = "abcd".getBytes(StandardCharsets.UTF_8); + testEncryptDecrypt(aad); + } + + private void testEncryptDecrypt(byte[] aad) { + SecureRandom random = new SecureRandom(); + int[] aesKeyLengthArray = {16, 24, 32}; + for (int keyLength : aesKeyLengthArray) { + byte[] key = new byte[keyLength]; + random.nextBytes(key); + Ciphers.AesGcmEncryptor encryptor = new Ciphers.AesGcmEncryptor(key); + byte[] plaintext = new byte[16]; // typically used to encrypt DEKs + random.nextBytes(plaintext); + byte[] ciphertext = encryptor.encrypt(plaintext, aad); + + Ciphers.AesGcmDecryptor decryptor = new Ciphers.AesGcmDecryptor(key); + byte[] decryptedText = decryptor.decrypt(ciphertext, aad); + Assert.assertArrayEquals("Key length " + keyLength, plaintext, decryptedText); + } + } +}