Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}

if (available() <= 0 && len > 0) {
throw new EOFException();
return -1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if (len == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class AesGcmOutputStream extends PositionOutputStream {
private int currentBlockIndex;
private boolean isHeaderWritten;
private boolean lastBlockWritten;
private boolean isClosed;
private long finalPosition;

AesGcmOutputStream(PositionOutputStream targetStream, byte[] aesKey, byte[] fileAadPrefix) {
this.targetStream = targetStream;
Expand All @@ -56,6 +58,8 @@ public class AesGcmOutputStream extends PositionOutputStream {
this.currentBlockIndex = 0;
this.isHeaderWritten = false;
this.lastBlockWritten = false;
this.isClosed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: initialize finalPosition as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this.finalPosition = 0;
}

@Override
Expand All @@ -66,6 +70,10 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (isClosed) {
throw new IOException("Writing to closed stream");
}

if (!isHeaderWritten) {
writeHeader();
}
Expand Down Expand Up @@ -95,6 +103,10 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public long getPos() throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg manifest writer calls getPos after closing the avro stream. Wrong value was returned.

if (isClosed) {
return finalPosition;
}

return (long) currentBlockIndex * Ciphers.PLAIN_BLOCK_SIZE + positionInPlainBlock;
}

Expand All @@ -109,6 +121,9 @@ public void close() throws IOException {
writeHeader();
}

finalPosition = getPos();
isClosed = true;

encryptAndWriteBlock();

targetStream.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.encryption;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -60,8 +59,7 @@ public void testEmptyFile() throws IOException {
Assert.assertEquals("File size", 0, decryptedFile.getLength());

try (SeekableInputStream decryptedStream = decryptedFile.newStream()) {
Assertions.assertThatThrownBy(() -> decryptedStream.read(readBytes))
.isInstanceOf(EOFException.class);
Assert.assertEquals("Read empty stream", -1, decryptedStream.read(readBytes));
}

// check that the AAD is still verified, even for an empty file
Expand Down Expand Up @@ -284,6 +282,7 @@ public void testRandomWriteRead() throws IOException {
}

encryptedStream.close();
Assert.assertEquals("Final position in closed stream", offset, encryptedStream.getPos());

AesGcmInputFile decryptedFile =
new AesGcmInputFile(Files.localInput(testFile), key, aadPrefix);
Expand Down Expand Up @@ -380,6 +379,7 @@ public void testAlignedWriteRead() throws IOException {
}

encryptedStream.close();
Assert.assertEquals("Final position in closed stream", offset, encryptedStream.getPos());

AesGcmInputFile decryptedFile =
new AesGcmInputFile(Files.localInput(testFile), key, aadPrefix);
Expand Down