Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
package org.apache.iceberg.aws.s3;

import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.encryption.NativelyEncryptedFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
import software.amazon.awssdk.services.s3.S3Client;

public class S3InputFile extends BaseS3File implements InputFile {
public class S3InputFile extends BaseS3File implements InputFile, NativelyEncryptedFile {
private NativeFileCryptoParameters nativeDecryptionParameters;

public static S3InputFile fromLocation(String location, S3Client client, AwsProperties awsProperties,
MetricsContext metrics) {
return new S3InputFile(client, new S3URI(location), awsProperties, metrics);
Expand All @@ -49,4 +53,14 @@ public long getLength() {
public SeekableInputStream newStream() {
return new S3InputStream(client(), uri(), awsProperties(), metrics());
}

@Override
public NativeFileCryptoParameters nativeCryptoParameters() {
return nativeDecryptionParameters;
}

@Override
public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
this.nativeDecryptionParameters = nativeCryptoParameters;
}
}
16 changes: 15 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.encryption.NativelyEncryptedFile;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.metrics.MetricsContext;
import software.amazon.awssdk.services.s3.S3Client;

public class S3OutputFile extends BaseS3File implements OutputFile {
public class S3OutputFile extends BaseS3File implements OutputFile, NativelyEncryptedFile {
private NativeFileCryptoParameters nativeEncryptionParameters;

public static S3OutputFile fromLocation(String location, S3Client client, AwsProperties awsProperties,
MetricsContext metrics) {
return new S3OutputFile(client, new S3URI(location), awsProperties, metrics);
Expand Down Expand Up @@ -67,4 +71,14 @@ public PositionOutputStream createOrOverwrite() {
public InputFile toInputFile() {
return new S3InputFile(client(), uri(), awsProperties(), metrics());
}

@Override
public NativeFileCryptoParameters nativeCryptoParameters() {
return nativeEncryptionParameters;
}

@Override
public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
this.nativeEncryptionParameters = nativeCryptoParameters;
}
}
128 changes: 128 additions & 0 deletions core/src/main/java/org/apache/iceberg/encryption/Ciphers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.encryption;

import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import javax.crypto.AEADBadTagException;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class Ciphers {
private static final int NONCE_LENGTH = 12;
private static final int GCM_TAG_LENGTH = 16;
private static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH;

public static class AesGcmEncryptor {
private final SecretKeySpec aesKey;
private final Cipher cipher;
private final SecureRandom randomGenerator;

public AesGcmEncryptor(byte[] keyBytes) {
Preconditions.checkArgument(keyBytes != null, "Key can't be null");
int keyLength = keyBytes.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is public method, should we do checkNotNull(keyBytes)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, why not

Preconditions.checkArgument((keyLength == 16 || keyLength == 24 || keyLength == 32),
"Cannot use a key of length " + keyLength + " because AES only allows 16, 24 or 32 bytes");
this.aesKey = new SecretKeySpec(keyBytes, "AES");

try {
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to create GCM cipher", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if you want to define an Iceberg Crypto Runtime Exception just like you did for Parquet (ParquetCryptoRuntimeException). The reason is RuntimeException is a very generic type and we will lose some meaning when converting from GeneralSecurityException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mm, the callers won't have a type-specific reaction to this. The text itself provides the info for a post-mortem analysis. So probably this goes down to Iceberg's coding practices.

}

this.randomGenerator = new SecureRandom();
}

public byte[] encrypt(byte[] plainText, byte[] aad) {
byte[] nonce = new byte[NONCE_LENGTH];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parquet, you have a check of excessive use of one single key(GCM_RANDOM_IV_SAME_KEY_MAX_OPS). Do you still want to do that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Iceberg class is designed for encryption of keys, not for encryption of data; so the risk of exceeding 2 billion operations during a process lifetime (meaning creation of 2 billion parquet files), is not real.

randomGenerator.nextBytes(nonce);
int cipherTextLength = NONCE_LENGTH + plainText.length + GCM_TAG_LENGTH;
byte[] cipherText = new byte[cipherTextLength];

try {
GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce);
cipher.init(Cipher.ENCRYPT_MODE, aesKey, spec);
if (null != aad) {
cipher.updateAAD(aad);
}
cipher.doFinal(plainText, 0, plainText.length, cipherText, NONCE_LENGTH);
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to encrypt", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments above and other places.

}

// Add the nonce
System.arraycopy(nonce, 0, cipherText, 0, NONCE_LENGTH);

return cipherText;
}
}

public static class AesGcmDecryptor {
private final SecretKeySpec aesKey;
private final Cipher cipher;

public AesGcmDecryptor(byte[] keyBytes) {
Preconditions.checkArgument(keyBytes != null, "Key can't be null");
int keyLength = keyBytes.length;
Preconditions.checkArgument((keyLength == 16 || keyLength == 24 || keyLength == 32),
"Cannot use a key of length " + keyLength + " because AES only allows 16, 24 or 32 bytes");
this.aesKey = new SecretKeySpec(keyBytes, "AES");

try {
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to create GCM cipher", e);
}
}

public byte[] decrypt(byte[] ciphertext, byte[] aad) {
int plainTextLength = ciphertext.length - GCM_TAG_LENGTH - NONCE_LENGTH;
Preconditions.checkState(plainTextLength >= 1,
"Cannot decrypt cipher text of length " + ciphertext.length +
" because text must longer than GCM_TAG_LENGTH + NONCE_LENGTH bytes. Text may not be encrypted" +
" with AES GCM cipher");

// Get the nonce from ciphertext
byte[] nonce = new byte[NONCE_LENGTH];
System.arraycopy(ciphertext, 0, nonce, 0, NONCE_LENGTH);

byte[] plainText = new byte[plainTextLength];
int inputLength = ciphertext.length - NONCE_LENGTH;
try {
GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce);
cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
if (null != aad) {
cipher.updateAAD(aad);
}
cipher.doFinal(ciphertext, NONCE_LENGTH, inputLength, plainText, 0);
} catch (AEADBadTagException e) {
throw new RuntimeException("GCM tag check failed. Possible reasons: wrong decryption key; or corrupt/tampered" +
"data. AES GCM doesn't differentiate between these two.. ", e);
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to decrypt", e);
}

return plainText;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.encryption;

/**
* Algorithm supported for file encryption.
*/
public enum EncryptionAlgorithm {
/**
* Counter mode (CTR) allows fast encryption with high throughput.
* It is an encryption only cipher and does not ensure content integrity.
* Inputs to CTR cipher are:
* 1. encryption key
* 2. a 16-byte initialization vector (12-byte nonce, 4-byte counter)
* 3. plaintext data
*/
AES_CTR,
/**
* Galois/Counter mode (GCM) combines CTR with the new Galois mode of authentication.
* It not only ensures data confidentiality, but also ensures data integrity.
* Inputs to GCM cipher are:
* 1. encryption key
* 2. a 12-byte initialization vector
* 3. additional authenticated data
* 4. plaintext data
*/
AES_GCM,
/**
* A combination of GCM and CTR that can be used for file types like Parquet,
* so that all modules except pages are encrypted by GCM to ensure integrity,
* and CTR is used for efficient encryption of bulk data.
* The tradeoff is that attackers would be able to tamper page data encrypted with CTR.
*/
AES_GCM_CTR
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.encryption;

import java.nio.ByteBuffer;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Barebone encryption parameters, one object per content file.
* Carries the file encryption key (later, will be extended with column keys and AAD prefix).
* Applicable only to formats with native encryption support (Parquet and ORC).
*/
public class NativeFileCryptoParameters {
private ByteBuffer fileKey;
private EncryptionAlgorithm fileEncryptionAlgorithm;

private NativeFileCryptoParameters(ByteBuffer fileKey, EncryptionAlgorithm fileEncryptionAlgorithm) {
Preconditions.checkState(fileKey != null, "File encryption key is not supplied");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key length checking also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key length is checked elsewhere, we shouldn't run the checks in each method that passes a key.

this.fileKey = fileKey;
this.fileEncryptionAlgorithm = fileEncryptionAlgorithm;
}

/**
* Creates the builder.
*
* @param fileKey per-file encryption key. For example, used as "footer key" DEK in Parquet encryption.
*/
public static Builder create(ByteBuffer fileKey) {
return new Builder(fileKey);
}

public static class Builder {
private ByteBuffer fileKey;
private EncryptionAlgorithm fileEncryptionAlgorithm;

private Builder(ByteBuffer fileKey) {
this.fileKey = fileKey;
}

public Builder encryptionAlgorithm(EncryptionAlgorithm encryptionAlgorithm) {
this.fileEncryptionAlgorithm = encryptionAlgorithm;
return this;
}

public NativeFileCryptoParameters build() {
return new NativeFileCryptoParameters(fileKey, fileEncryptionAlgorithm);
}
}

public ByteBuffer fileKey() {
return fileKey;
}

public EncryptionAlgorithm encryptionAlgorithm() {
return fileEncryptionAlgorithm;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.encryption;

/**
* This interface is applied to OutputFile and InputFile implementations, in order to enable delivery of crypto
* parameters (such as encryption keys etc) from the Iceberg key management module to the writers/readers of file
* formats that support encryption natively (Parquet and ORC).
*/
public interface NativelyEncryptedFile {
Copy link
Contributor

@flyrain flyrain Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move it to module api instead of putting it in core? It also requires to create an interface for NativeFileCryptoParameters. Not sure that's a better way. Would like to hear people's thoughts. cc @RussellSpitzer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the NativeFileCryptoParameters class, located in core

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need an interface for NativeFileCryptoParameters in that case, which should located in module API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NativeFileCryptoParameters is the only class planned for these properties; having an interface just for one implementation won't be efficient. Also, this is an internal class, not designed to be a part of Iceberg API.
However, thinking about the KmsClient interface in the next PR - this one will have many implementations, and it is a part of the user-facing API in the envelope encryption. I'll move it to the API module.

NativeFileCryptoParameters nativeCryptoParameters();

void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.encryption.NativeFileCryptoParameters;
import org.apache.iceberg.encryption.NativelyEncryptedFile;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.InputFile;
Expand All @@ -40,7 +42,7 @@
* <p>
* This class is based on Parquet's HadoopInputFile.
*/
public class HadoopInputFile implements InputFile {
public class HadoopInputFile implements InputFile, NativelyEncryptedFile {
public static final String[] NO_LOCATION_PREFERENCE = new String[0];

private final String location;
Expand All @@ -49,6 +51,7 @@ public class HadoopInputFile implements InputFile {
private final Configuration conf;
private FileStatus stat = null;
private Long length = null;
private NativeFileCryptoParameters nativeDecryptionParameters;

public static HadoopInputFile fromLocation(CharSequence location, Configuration conf) {
FileSystem fs = Util.getFs(new Path(location.toString()), conf);
Expand Down Expand Up @@ -224,6 +227,16 @@ public boolean exists() {
}
}

@Override
public NativeFileCryptoParameters nativeCryptoParameters() {
return nativeDecryptionParameters;
}

@Override
public void setNativeCryptoParameters(NativeFileCryptoParameters nativeCryptoParameters) {
this.nativeDecryptionParameters = nativeCryptoParameters;
}

@Override
public String toString() {
return path.toString();
Expand Down
Loading