Skip to content

Conversation

@ggershinsky
Copy link
Contributor

@ggershinsky ggershinsky commented Oct 6, 2021

Implements #2060

Besides encrypting and signing metadata (avro, json, puffin), this stream can also encrypt Avro data.

@github-actions github-actions bot added the core label Oct 6, 2021

public class AesGcmOutputStream extends PositionOutputStream {
// AES-GCM parameters
public static final int GCM_NONCE_LENGTH = 12; // in bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we should make these variables in a shared util class, instead of importing this from the output stream to the input stream

gcmCipher = Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline after control statement

@jackye1995
Copy link
Contributor

Did a comparison against my internal impl, mostly look correct to me. Will take another look after all the remaining methods are implemented.

import static org.apache.iceberg.encryption.AesGcmOutputStream.HEADER_SIZE_LENGTH;

public class AesGcmInputStream extends SeekableInputStream {
private SeekableInputStream sourceStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it final?

Copy link
Contributor Author

@ggershinsky ggershinsky Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will change in the next commit

private int numberOfBlocks;
private int lastBlockSize;
private long plainStreamSize;
private byte[] fileAadPrefix;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not used anywhere. Do we need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, there is a TODO item referring it in the method read, never mind.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both plainStreamSize and fileAadPrefix could be final.

private long plainStreamSize;
private byte[] fileAadPrefix;

AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it to be public since the class itself is public? Or we can have a builder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always created by an AesGcmInputFile in the same package.

private SeekableInputStream sourceStream;
private long netSourceFileSize;

private Cipher gcmCipher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: make it final.

Comment on lines 43 to 53
private PositionOutputStream targetStream;

private Cipher gcmCipher;
private SecureRandom random;
private SecretKey key;
private byte[] nonce;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields can be final.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ggershinsky for the patch. Can we add a unit test for these two classes?

@liujinhui1994
Copy link

@ggershinsky Is this PR still going on?

@ggershinsky
Copy link
Contributor Author

same as in #2639 (comment)

@ggershinsky
Copy link
Contributor Author

Thanks @ggershinsky for the patch. Can we add a unit test for these two classes?

done


@Override
public long getLength() {
Preconditions.checkArgument(plaintextLength >= 0, "Length is known after new stream is created");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the length if field plaintextLength is -1? Looks like this method is valid only after newStream() is called.

if(plaintextLength == -1) {
this.newStream();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we close the stream in this case? Or do we just make and close it in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both suggestions sound good to me.

int toLoad = lastBlock ? lastBlockSize : cipherBlockSize;
int loaded = sourceStream.read(ciphertextBlockBuffer, 0, toLoad);
if (loaded != toLoad) {
throw new IOException("Read " + loaded + " instead of " + toLoad);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: Would it be a bit easier to understand if message is "Should read toLoad data, but only get loaded data"?

@Override
public void seek(long newPos) throws IOException {
if (newPos >= plainStreamSize) {
throw new IOException("At or beyond max stream size " + plainStreamSize + ", " + newPos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The massage could be

"The seeking position " + newPos + " has reached or exceeded the max stream size "  + plainStreamSize.


@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't have to implement it, the abstract class InputStream has the same logic.

try {
return new AesGcmOutputStream(targetFile.create(), dataKey, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add message like, "Failed to create file: %s", targetFile.location()?

try {
return new AesGcmOutputStream(targetFile.createOrOverwrite(), dataKey, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add message like, "Failed to create or overwrite file: %s", targetFile.location()?

private SecretKey key;
private byte[] nonce;

private int blockSize = 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it a final static field?

private int positionInBuffer;
private long streamPosition;
private int currentBlockIndex;
private byte[] fileAadPrefix;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be a final field.

try {
int encrypted = gcmCipher.doFinal(plaintextBlockBuffer, 0, positionInBuffer, cipherText, GCM_NONCE_LENGTH);
Preconditions.checkArgument((encrypted == (positionInBuffer + GCM_TAG_LENGTH)),
"Wrong length of encrypted output: " + encrypted + " vs " + (positionInBuffer + GCM_TAG_LENGTH));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a message like this?

"Wrong length of encrypted output:  expected " + (positionInBuffer + GCM_TAG_LENGTH)) + " but " + encrypted;

targetStream.write(cipherText);
}

static byte[] calculateAAD(byte[] fileAadPrefix, int currentBlockIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can go to a common place as well since both classes need it. We may have a util class, e.g. GcmUtil

try {
gcmCipher = Cipher.getInstance("AES/GCM/NoPadding");
} catch (GeneralSecurityException e) {
throw new IOException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add message for easier debugging?

new IOException("Failed to get an instance of GCM cipher",  e)

Comment on lines 59 to 60
Preconditions.checkArgument(fetched == AesGcmOutputStream.PREFIX_LENGTH,
"Insufficient read " + fetched);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can be in the same line.
IIUC, this means the stream length is not long enough. Can we say something like "The stream length should be at least " + AesGcmOutputStream.PREFIX_LENGTH?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we also in danger of the read being less than prefix bytes?

private int currentBlockIndex;
private int currentOffsetInPlainBlock;
private int numberOfBlocks;
private int lastBlockSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename it to lastCipherBlockSize to make it more readable?

Comment on lines 39 to 40
private SecretKey key;
private byte[] nonce;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final as well.

public TemporaryFolder temp = new TemporaryFolder();

@Test
public void testRandomWriteRead() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, could I ask for just 4 more test cases

Empty File (no bits)
File that aligns perfectly with encryption Chunk Size
File that is exactly one byte to larger than the aligned and one that is one byte smaller than the aligned file. (we probably hit this unaligned version with the testFileSize below but just to make sure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add

System.arraycopy(prefixBytes, 0, magic, 0, AesGcmOutputStream.MAGIC_ARRAY.length);

Preconditions.checkArgument(Arrays.equals(AesGcmOutputStream.MAGIC_ARRAY, magic),
"File with wrong magic string. Should start with " + AesGcmOutputStream.MAGIC_STRING);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of "file with wrong magic string" -> "Cannot read unencrypted file, missing header containing: ..."?

cipherBlockSize = plainBlockSize + AesGcmOutputStream.GCM_NONCE_LENGTH + AesGcmOutputStream.GCM_TAG_LENGTH;

try {
gcmCipher = Cipher.getInstance("AES/GCM/NoPadding");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constant for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We actually already have a Ciphers class; let's re-use it (with some additions) for all GCM-related things

@ggershinsky
Copy link
Contributor Author

I had a lot of comments, so I went ahead and implemented most of them on top of this. I think it simplified how the input stream works quite a bit and I fixed a lot of the comments that I made. Please see ggershinsky#6.

Thanks! Merged the PR.

}

private int availableInCurrentBlock() {
if (currentPlainBlockIndex < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was refactoring, there was a bug that I fixed when the block for plainStreamPosition didn't match currentPlainBlockIndex. I ended up fixing the problem by setting currentPlainBlockIndex = -1 in a place that I had missed. Now that I'm thinking about it more, I think the right solution is actually to update the check here instead. That way the bytes available will only be non-zero if the current block matches the position.

Here's a diff that does what I'm talking about and still passes tests:

diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
index a63134f31d..88e9b36c25 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
@@ -97,7 +97,7 @@ public class AesGcmInputStream extends SeekableInputStream {
   }
 
   private int availableInCurrentBlock() {
-    if (currentPlainBlockIndex < 0) {
+    if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
       return 0;
     }
 
@@ -130,10 +130,6 @@ public class AesGcmInputStream extends SeekableInputStream {
         remainingBytesToRead -= bytesToCopy;
         resultBufferOffset += bytesToCopy;
         this.plainStreamPosition += bytesToCopy;
-        if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
-          // invalidate the current block
-          this.currentPlainBlockIndex = -1;
-        }
 
       } else if (available() > 0) {
         decryptBlock(blockIndex(plainStreamPosition));
@@ -157,10 +153,6 @@ public class AesGcmInputStream extends SeekableInputStream {
     }
 
     this.plainStreamPosition = newPos;
-    if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
-      // invalidate the current block
-      this.currentPlainBlockIndex = -1;
-    }
   }
 
   @Override
@@ -177,10 +169,6 @@ public class AesGcmInputStream extends SeekableInputStream {
     }
 
     this.plainStreamPosition += n;
-    if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
-      // invalidate the current block
-      this.currentPlainBlockIndex = -1;
-    }
 
     return n;
   }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Applied this change.

this.gcmEncryptor = new Ciphers.AesGcmEncryptor(aesKey);
this.plainBlockBuffer = new byte[Ciphers.PLAIN_BLOCK_SIZE];
this.positionInBuffer = 0;
this.streamPosition = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this always be targetStream.pos() + positionInBuffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this value is returned by

@Override
  public long getPos() throws IOException {
    return streamPosition;
  }

so is not related directly to targetStream.pos() and positionInBuffer.
I'll rename positionInBuffer to positionInPlainBlock, and plainBlockBuffer to plainBlock, to make the naming more consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why what I said wouldn't always be true. If so, we could eliminate this extra state.

+ enciphered
+ ". Must be "
+ plaintextLength
+ GCM_TAG_LENGTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be more clear because it doesn't tell the reader that the encryption didn't work right.

Instead, how about Failed to encrypt block: expected %s encrypted bytes but produced %s bytes?

@rdblue
Copy link
Contributor

rdblue commented Sep 4, 2023

@ggershinsky, I opened ggershinsky#8 with the remaining changes for this PR.

The main change is to write an empty block for empty files. Otherwise an attacker could replace a valid file with an empty file without detection.

@ggershinsky
Copy link
Contributor Author

Cool! LGTM. This could have been detected via external check of details (such as the file length, stored in parent metadata), but I agree doing this in the stream is cleaner and more reliable. Thanks for the PR, merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Development

Successfully merging this pull request may close these issues.

6 participants