diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java index 57fed69c0172..1f52ab3682f8 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java +++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java @@ -112,7 +112,7 @@ public int read(byte[] b, int off, int len) throws IOException { } if (available() <= 0 && len > 0) { - throw new EOFException(); + return -1; } if (len == 0) { diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java index 4db0802ea1b3..b4f723cca3e7 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java +++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java @@ -44,6 +44,8 @@ public class AesGcmOutputStream extends PositionOutputStream { private int currentBlockIndex; private boolean isHeaderWritten; private boolean lastBlockWritten; + private boolean isClosed; + private long finalPosition; AesGcmOutputStream(PositionOutputStream targetStream, byte[] aesKey, byte[] fileAadPrefix) { this.targetStream = targetStream; @@ -56,6 +58,8 @@ public class AesGcmOutputStream extends PositionOutputStream { this.currentBlockIndex = 0; this.isHeaderWritten = false; this.lastBlockWritten = false; + this.isClosed = false; + this.finalPosition = 0; } @Override @@ -66,6 +70,10 @@ public void write(int b) throws IOException { @Override public void write(byte[] b, int off, int len) throws IOException { + if (isClosed) { + throw new IOException("Writing to closed stream"); + } + if (!isHeaderWritten) { writeHeader(); } @@ -95,6 +103,10 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public long getPos() throws IOException { + if (isClosed) { + return finalPosition; + } + return (long) currentBlockIndex * Ciphers.PLAIN_BLOCK_SIZE + positionInPlainBlock; } @@ -109,6 +121,9 @@ public void close() throws IOException { writeHeader(); } + finalPosition = getPos(); + isClosed = true; + encryptAndWriteBlock(); targetStream.close(); diff --git a/core/src/test/java/org/apache/iceberg/encryption/TestGcmStreams.java b/core/src/test/java/org/apache/iceberg/encryption/TestGcmStreams.java index 773d5f41af94..a954cf760baa 100644 --- a/core/src/test/java/org/apache/iceberg/encryption/TestGcmStreams.java +++ b/core/src/test/java/org/apache/iceberg/encryption/TestGcmStreams.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.encryption; -import java.io.EOFException; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -60,8 +59,7 @@ public void testEmptyFile() throws IOException { Assert.assertEquals("File size", 0, decryptedFile.getLength()); try (SeekableInputStream decryptedStream = decryptedFile.newStream()) { - Assertions.assertThatThrownBy(() -> decryptedStream.read(readBytes)) - .isInstanceOf(EOFException.class); + Assert.assertEquals("Read empty stream", -1, decryptedStream.read(readBytes)); } // check that the AAD is still verified, even for an empty file @@ -284,6 +282,7 @@ public void testRandomWriteRead() throws IOException { } encryptedStream.close(); + Assert.assertEquals("Final position in closed stream", offset, encryptedStream.getPos()); AesGcmInputFile decryptedFile = new AesGcmInputFile(Files.localInput(testFile), key, aadPrefix); @@ -380,6 +379,7 @@ public void testAlignedWriteRead() throws IOException { } encryptedStream.close(); + Assert.assertEquals("Final position in closed stream", offset, encryptedStream.getPos()); AesGcmInputFile decryptedFile = new AesGcmInputFile(Files.localInput(testFile), key, aadPrefix);