Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public long getLength() {
public SeekableInputStream newStream() {
long ciphertextLength = sourceFile.getLength();
Preconditions.checkState(
ciphertextLength >= Ciphers.GCM_STREAM_HEADER_LENGTH,
"Invalid encrypted stream: %d is shorter than the GCM stream header length",
ciphertextLength >= Ciphers.MIN_STREAM_LENGTH,
"Invalid encrypted stream: %d is shorter than the minimum possible stream length",
ciphertextLength);
return new AesGcmInputStream(sourceFile.newStream(), ciphertextLength, dataKey, fileAADPrefix);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class AesGcmInputStream extends SeekableInputStream {
private final SeekableInputStream sourceStream;
private final byte[] fileAADPrefix;
private final Ciphers.AesGcmDecryptor decryptor;
private final byte[] cipherBlockBuffer;
private final byte[] currentPlainBlock;
private final long numBlocks;
private final int lastCipherBlockSize;
private final long plainStreamSize;
private final byte[] singleByte;

private long plainStreamPosition;
private long currentPlainBlockIndex;
private byte[] cipherBlockBuffer;
private byte[] currentPlainBlock;
private int currentPlainBlockSize;

AesGcmInputStream(
Expand Down Expand Up @@ -107,6 +107,10 @@ private int availableInCurrentBlock() {
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkArgument(len >= 0, "Invalid read length: " + len);

if (currentPlainBlockIndex < 0) {
decryptBlock(0);
}

if (available() <= 0 && len > 0) {
throw new EOFException();
}
Expand Down Expand Up @@ -183,16 +187,12 @@ public int read() throws IOException {
return -1;
}

int unsignedByte = singleByte[0] >= 0 ? singleByte[0] : 256 + singleByte[0];

return unsignedByte;
return singleByte[0] >= 0 ? singleByte[0] : 256 + singleByte[0];
}

@Override
public void close() throws IOException {
sourceStream.close();
this.currentPlainBlock = null;
this.cipherBlockBuffer = null;
}

private void decryptBlock(long blockIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class AesGcmOutputStream extends PositionOutputStream {

Expand All @@ -31,29 +32,30 @@ public class AesGcmOutputStream extends PositionOutputStream {
.put(Ciphers.GCM_STREAM_MAGIC_ARRAY)
.putInt(Ciphers.PLAIN_BLOCK_SIZE)
.array();

private final Ciphers.AesGcmEncryptor gcmEncryptor;
private final PositionOutputStream targetStream;
private final byte[] fileAadPrefix;
private final byte[] singleByte;
private final byte[] plainBlock;
private final byte[] cipherBlock;

private byte[] plainBlock;
private byte[] cipherBlock;
private int positionInPlainBlock;
private long streamPosition;
private int currentBlockIndex;
private boolean isHeaderWritten;
private boolean lastBlockWritten;

AesGcmOutputStream(PositionOutputStream targetStream, byte[] aesKey, byte[] fileAadPrefix) {
this.targetStream = targetStream;
this.gcmEncryptor = new Ciphers.AesGcmEncryptor(aesKey);
this.fileAadPrefix = fileAadPrefix;
this.singleByte = new byte[1];
this.plainBlock = new byte[Ciphers.PLAIN_BLOCK_SIZE];
this.cipherBlock = new byte[Ciphers.CIPHER_BLOCK_SIZE];
this.positionInPlainBlock = 0;
this.streamPosition = 0;
this.currentBlockIndex = 0;
this.fileAadPrefix = fileAadPrefix;
this.isHeaderWritten = false;
this.singleByte = new byte[1];
this.lastBlockWritten = false;
}

@Override
Expand Down Expand Up @@ -85,17 +87,15 @@ public void write(byte[] b, int off, int len) throws IOException {
offset += toWrite;
remaining -= toWrite;

if (positionInPlainBlock == Ciphers.PLAIN_BLOCK_SIZE) {
if (positionInPlainBlock == plainBlock.length) {
encryptAndWriteBlock();
}
}

streamPosition += len;
}

@Override
public long getPos() throws IOException {
return streamPosition;
return (long) currentBlockIndex * Ciphers.PLAIN_BLOCK_SIZE + positionInPlainBlock;
}

@Override
Expand All @@ -109,28 +109,31 @@ public void close() throws IOException {
writeHeader();
}

if (positionInPlainBlock > 0) {
encryptAndWriteBlock();
}
encryptAndWriteBlock();

targetStream.close();
plainBlock = null;
cipherBlock = null;
}

private void writeHeader() throws IOException {

targetStream.write(HEADER_BYTES);
isHeaderWritten = true;
}

private void encryptAndWriteBlock() throws IOException {
Preconditions.checkState(
!lastBlockWritten, "Cannot encrypt block: a partial block has already been written");

if (currentBlockIndex == Integer.MAX_VALUE) {
throw new IOException("Cannot write block: exceeded Integer.MAX_VALUE blocks");
}

if (positionInPlainBlock == 0) {
throw new IOException("Empty plain block");
if (positionInPlainBlock == 0 && currentBlockIndex != 0) {
return;
}

if (positionInPlainBlock != plainBlock.length) {
// signal that a partial block has been written and must be the last
this.lastBlockWritten = true;
}

byte[] aad = Ciphers.streamBlockAAD(fileAadPrefix, currentBlockIndex);
Expand Down
56 changes: 26 additions & 30 deletions core/src/main/java/org/apache/iceberg/encryption/Ciphers.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class Ciphers {

private static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH;

static final int MIN_STREAM_LENGTH = GCM_STREAM_HEADER_LENGTH + NONCE_LENGTH + GCM_TAG_LENGTH;

private Ciphers() {}

public static class AesGcmEncryptor {
Expand All @@ -54,20 +56,8 @@ public static class AesGcmEncryptor {
private final byte[] nonce;

public AesGcmEncryptor(byte[] keyBytes) {
Preconditions.checkArgument(keyBytes != null, "Key can't be null");
int keyLength = keyBytes.length;
Preconditions.checkArgument(
(keyLength == 16 || keyLength == 24 || keyLength == 32),
"Cannot use a key of length "
+ keyLength
+ " because AES only allows 16, 24 or 32 bytes");
this.aesKey = new SecretKeySpec(keyBytes, "AES");

try {
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to create GCM cipher", e);
}
this.aesKey = newKey(keyBytes);
this.cipher = newCipher();

this.randomGenerator = new SecureRandom();
this.nonce = new byte[NONCE_LENGTH];
Expand All @@ -92,7 +82,7 @@ public int encrypt(
int ciphertextOffset,
byte[] aad) {
Preconditions.checkArgument(
plaintextLength > 0, "Invalid plain text length: %s", plaintextLength);
plaintextLength >= 0, "Invalid plain text length: %s", plaintextLength);
randomGenerator.nextBytes(nonce);
int enciphered;

Expand Down Expand Up @@ -136,20 +126,8 @@ public static class AesGcmDecryptor {
private final Cipher cipher;

public AesGcmDecryptor(byte[] keyBytes) {
Preconditions.checkArgument(keyBytes != null, "Key can't be null");
int keyLength = keyBytes.length;
Preconditions.checkArgument(
(keyLength == 16 || keyLength == 24 || keyLength == 32),
"Cannot use a key of length "
+ keyLength
+ " because AES only allows 16, 24 or 32 bytes");
this.aesKey = new SecretKeySpec(keyBytes, "AES");

try {
this.cipher = Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to create GCM cipher", e);
}
this.aesKey = newKey(keyBytes);
this.cipher = newCipher();
}

public byte[] decrypt(byte[] ciphertext, byte[] aad) {
Expand All @@ -172,7 +150,7 @@ public int decrypt(
int plaintextOffset,
byte[] aad) {
Preconditions.checkState(
ciphertextLength - GCM_TAG_LENGTH - NONCE_LENGTH >= 1,
ciphertextLength - GCM_TAG_LENGTH - NONCE_LENGTH >= 0,
"Cannot decrypt cipher text of length "
+ ciphertext.length
+ " because text must longer than GCM_TAG_LENGTH + NONCE_LENGTH bytes. Text may not be encrypted"
Expand Down Expand Up @@ -207,6 +185,24 @@ public int decrypt(
}
}

private static SecretKeySpec newKey(byte[] keyBytes) {
Preconditions.checkArgument(keyBytes != null, "Invalid key: null");
int keyLength = keyBytes.length;
Preconditions.checkArgument(
(keyLength == 16 || keyLength == 24 || keyLength == 32),
"Invalid key length: %s (must be 16, 24, or 32 bytes)",
keyLength);
return new SecretKeySpec(keyBytes, "AES");
}

private static Cipher newCipher() {
try {
return Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new RuntimeException("Failed to create GCM cipher", e);
}
}

static byte[] streamBlockAAD(byte[] fileAadPrefix, int currentBlockIndex) {
byte[] blockAAD =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(currentBlockIndex).array();
Expand Down
Loading