From 8a8cbfe73d047fadc316b9b2c199748522115bfd Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Fri, 7 Nov 2025 14:30:59 +0100 Subject: [PATCH] Core, Parquet: Fix Parquet to work OutputFiles generated by StandardEncryptionManager.ecrypt().encryptingOutputFile() --- .../encryption/StandardEncryptionManager.java | 35 ++++++++-- .../parquet/TestParquetEncryption.java | 66 +++++++++++++++++++ 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 0523eb0d7f22..bdedf8a7e423 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -29,6 +29,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -289,6 +290,35 @@ public StandardKeyMetadata keyMetadata() { @Override public OutputFile encryptingOutputFile() { + return this; + } + + @Override + public OutputFile plainOutputFile() { + return plainOutputFile; + } + + @Override + public PositionOutputStream create() { + return lazyEncryptingOutputFile().create(); + } + + @Override + public PositionOutputStream createOrOverwrite() { + return lazyEncryptingOutputFile().createOrOverwrite(); + } + + @Override + public String location() { + return lazyEncryptingOutputFile().location(); + } + + @Override + public InputFile toInputFile() { + return lazyEncryptingOutputFile().toInputFile(); + } + + private OutputFile lazyEncryptingOutputFile() { if (null == lazyEncryptingOutputFile) { this.lazyEncryptingOutputFile = new AesGcmOutputFile( @@ -299,11 +329,6 @@ public OutputFile encryptingOutputFile() { return lazyEncryptingOutputFile; } - - @Override - public OutputFile plainOutputFile() { - return plainOutputFile; - } } private static class StandardDecryptedInputFile implements NativeEncryptionInputFile { diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java index 63f512ee632b..263da9652600 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.Files.localOutput; import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.parquet.hadoop.ParquetFileWriter.EF_MAGIC_STR; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -29,14 +30,26 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.SecureRandom; import java.util.List; import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionTestHelpers; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.hadoop.HadoopOutputFile; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types.IntegerType; import org.apache.parquet.crypto.ParquetCryptoRuntimeException; @@ -125,4 +138,57 @@ public void testReadEncryptedFile() throws IOException { } } } + + @Test + public void testReadAndWriteHadoopFile() throws IOException { + List records = Lists.newArrayListWithCapacity(RECORD_COUNT); + for (int i = 1; i <= RECORD_COUNT; i++) { + GenericRecord record = GenericRecord.create(SCHEMA.asStruct()); + record.set(0, i); + records.add(record); + } + + org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(createTempFile(temp).toURI()); + + EncryptedOutputFile encryptedOutputFile = + EncryptionTestHelpers.createEncryptionManager() + .encrypt(HadoopOutputFile.fromPath(path, new Configuration())); + NativeEncryptionKeyMetadata keyMetadata = + ((NativeEncryptionOutputFile) encryptedOutputFile).keyMetadata(); + FileAppender writer = + Parquet.write(encryptedOutputFile.encryptingOutputFile()) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()) + .schema(SCHEMA) + .createWriterFunc(fileSchema -> GenericParquetWriter.create(SCHEMA, fileSchema)) + .build(); + + try (writer) { + writer.addAll(Lists.newArrayList(records.toArray(new GenericRecord[] {}))); + } + + InputFile inputFile = HadoopInputFile.fromPath(path, new Configuration()); + checkFileEncryption(inputFile); + try (CloseableIterator readRecords = + Parquet.read(inputFile) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()) + .project(SCHEMA) + .callInit() + .build() + .iterator()) { + for (int i = 1; i <= RECORD_COUNT; i++) { + GenericData.Record readRecord = (GenericData.Record) readRecords.next(); + assertThat(readRecord.get(COLUMN_NAME)).isEqualTo(i); + } + } + } + + private void checkFileEncryption(InputFile inputFile) throws IOException { + SeekableInputStream stream = inputFile.newStream(); + byte[] magic = new byte[4]; + stream.read(magic); + stream.close(); + assertThat(magic).isEqualTo(EF_MAGIC_STR.getBytes(StandardCharsets.UTF_8)); + } }