diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java new file mode 100644 index 000000000000..ba382e1c8ab2 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.types.Types.NestedField.optional; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.DataTest; +import org.apache.iceberg.data.DataTestHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.apache.parquet.hadoop.ParquetWriter; +import org.junit.Assert; +import org.junit.Test; + +public class TestParquetEncryptionWithWriteSupport extends DataTest { + private static final ByteBuffer fileDek = ByteBuffer.allocate(16); + private static final ByteBuffer aadPrefix = ByteBuffer.allocate(16); + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List expected = RandomGenericData.generate(schema, 100, 0L); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + SecureRandom rand = new SecureRandom(); + rand.nextBytes(fileDek.array()); + rand.nextBytes(aadPrefix.array()); + + try (FileAppender appender = + Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .createWriterFunc(GenericParquetWriter::buildWriter) + .build()) { + appender.addAll(expected); + } + + Assert.assertThrows( + "Decrypted without keys", + ParquetCryptoRuntimeException.class, + () -> + Parquet.read(localInput(testFile)) + .project(schema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build() + .iterator()); + + List rows; + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + } + + // test reuseContainers + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .reuseContainers() + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build()) { + CloseableIterator it = reader.iterator(); + int idx = 0; + while (it.hasNext()) { + GenericRecord actualRecord = (GenericRecord) it.next(); + DataTestHelpers.assertEquals(schema.asStruct(), expected.get(idx), actualRecord); + idx++; + } + } + } + + @Test + public void testTwoLevelList() throws IOException { + Schema schema = + new Schema( + optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())), + optional(2, "topbytes", Types.BinaryType.get())); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + + File testFile = temp.newFile(); + Assert.assertTrue(testFile.delete()); + + SecureRandom rand = new SecureRandom(); + rand.nextBytes(fileDek.array()); + rand.nextBytes(aadPrefix.array()); + FileEncryptionProperties fileEncryptionProperties = + FileEncryptionProperties.builder(fileDek.array()).withAADPrefix(aadPrefix.array()).build(); + + ParquetWriter writer = + AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .withEncryption(fileEncryptionProperties) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + List expectedByteList = new ArrayList(); + byte[] expectedByte = {0x00, 0x01}; + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte); + expectedByteList.add(expectedBinary); + recordBuilder.set("arraybytes", expectedByteList); + recordBuilder.set("topbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + writer.write(expectedRecord); + writer.close(); + + // test reuseContainers + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .reuseContainers() + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .build()) { + CloseableIterator it = reader.iterator(); + Assert.assertTrue("Should have at least one row", it.hasNext()); + while (it.hasNext()) { + GenericRecord actualRecord = (GenericRecord) it.next(); + Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), expectedBinary); + Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), expectedBinary); + Assert.assertFalse("Should not have more than one row", it.hasNext()); + } + } + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index cdfb9d59b059..4c9093d5b4e7 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -48,6 +48,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Locale; @@ -89,6 +90,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; @@ -96,6 +98,8 @@ import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.crypto.FileDecryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetOutputFormat; @@ -113,7 +117,8 @@ private Parquet() {} Sets.newHashSet( "parquet.read.filter", "parquet.private.read.filter.predicate", - "parquet.read.support.class"); + "parquet.read.support.class", + "parquet.crypto.factory.class"); public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); @@ -132,6 +137,8 @@ public static class WriteBuilder { private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE; private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; private Function, Context> createContextFunc = Context::dataContext; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; private WriteBuilder(OutputFile file) { this.file = file; @@ -204,6 +211,16 @@ public WriteBuilder writerVersion(WriterVersion version) { return this; } + public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + this.fileEncryptionKey = encryptionKey; + return this; + } + + public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + this.fileAADPrefix = aadPrefix; + return this; + } + @SuppressWarnings("unchecked") private WriteSupport getWriteSupport(MessageType type) { if (writeSupport != null) { @@ -275,6 +292,20 @@ public FileAppender build() throws IOException { set("parquet.avro.write-old-list-structure", "false"); MessageType type = ParquetSchemaUtil.convert(schema, name); + FileEncryptionProperties fileEncryptionProperties = null; + if (fileEncryptionKey != null) { + byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); + byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); + + fileEncryptionProperties = + FileEncryptionProperties.builder(encryptionKeyArray) + .withAADPrefix(aadPrefixArray) + .withoutAADPrefixStorage() + .build(); + } else { + Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); + } + if (createWriterFunc != null) { Preconditions.checkArgument( writeSupport == null, "Cannot write with both write support and Parquet value writer"); @@ -312,7 +343,8 @@ public FileAppender build() throws IOException { codec, parquetProperties, metricsConfig, - writeMode); + writeMode, + fileEncryptionProperties); } else { ParquetWriteBuilder parquetWriteBuilder = new ParquetWriteBuilder(ParquetIO.file(file)) @@ -327,7 +359,8 @@ public FileAppender build() throws IOException { .withPageSize(pageSize) .withPageRowCountLimit(pageRowLimit) .withDictionaryEncoding(dictionaryEnabled) - .withDictionaryPageSize(dictionaryPageSize); + .withDictionaryPageSize(dictionaryPageSize) + .withEncryption(fileEncryptionProperties); for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { String colPath = entry.getKey(); @@ -658,6 +691,16 @@ public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) { return this; } + public DataWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) { + appenderBuilder.withFileEncryptionKey(fileEncryptionKey); + return this; + } + + public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + appenderBuilder.withAADPrefix(aadPrefix); + return this; + } + public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { this.sortOrder = newSortOrder; return this; @@ -759,6 +802,16 @@ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) { return this; } + public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) { + appenderBuilder.withFileEncryptionKey(fileEncryptionKey); + return this; + } + + public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + appenderBuilder.withAADPrefix(aadPrefix); + return this; + } + public DeleteWriteBuilder equalityFieldIds(List fieldIds) { this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds); return this; @@ -930,6 +983,8 @@ public static class ReadBuilder { private boolean reuseContainers = false; private int maxRecordsPerBatch = 10000; private NameMapping nameMapping = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; private ReadBuilder(InputFile file) { this.file = file; @@ -1019,8 +1074,31 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) { return this; } + public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + this.fileEncryptionKey = encryptionKey; + return this; + } + + public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { + this.fileAADPrefix = aadPrefix; + return this; + } + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) public CloseableIterable build() { + FileDecryptionProperties fileDecryptionProperties = null; + if (fileEncryptionKey != null) { + byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey); + byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix); + fileDecryptionProperties = + FileDecryptionProperties.builder() + .withFooterKey(encryptionKeyArray) + .withAADPrefix(aadPrefixArray) + .build(); + } else { + Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key"); + } + if (readerFunc != null || batchedReaderFunc != null) { ParquetReadOptions.Builder optionsBuilder; if (file instanceof HadoopInputFile) { @@ -1046,6 +1124,10 @@ public CloseableIterable build() { optionsBuilder.withRange(start, start + length); } + if (fileDecryptionProperties != null) { + optionsBuilder.withDecryption(fileDecryptionProperties); + } + ParquetReadOptions options = optionsBuilder.build(); if (batchedReaderFunc != null) { @@ -1097,8 +1179,11 @@ public CloseableIterable build() { if (filter != null) { // TODO: should not need to get the schema to push down before opening the file. // Parquet should allow setting a filter inside its read support + ParquetReadOptions decryptOptions = + ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build(); MessageType type; - try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) { + try (ParquetFileReader schemaReader = + ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) { type = schemaReader.getFileMetaData().getSchema(); } catch (IOException e) { throw new RuntimeIOException(e); @@ -1131,6 +1216,10 @@ public CloseableIterable build() { builder.withNameMapping(nameMapping); } + if (fileDecryptionProperties != null) { + builder.withDecryption(fileDecryptionProperties); + } + return new ParquetIterable<>(builder); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index db49efe61dff..577004993711 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -28,18 +28,16 @@ import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.page.PageWriteStore; -import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.crypto.InternalFileEncryptor; import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; @@ -48,22 +46,6 @@ class ParquetWriter implements FileAppender, Closeable { private static final Metrics EMPTY_METRICS = new Metrics(0L, null, null, null, null); - private static final DynConstructors.Ctor pageStoreCtorParquet = - DynConstructors.builder(PageWriteStore.class) - .hiddenImpl( - "org.apache.parquet.hadoop.ColumnChunkPageWriteStore", - CodecFactory.BytesCompressor.class, - MessageType.class, - ByteBufferAllocator.class, - int.class) - .build(); - - private static final DynMethods.UnboundMethod flushToWriter = - DynMethods.builder("flushToFileWriter") - .hiddenImpl( - "org.apache.parquet.hadoop.ColumnChunkPageWriteStore", ParquetFileWriter.class) - .build(); - private final long targetRowGroupSize; private final Map metadata; private final ParquetProperties props; @@ -75,13 +57,15 @@ class ParquetWriter implements FileAppender, Closeable { private final ParquetFileWriter.Mode writeMode; private final OutputFile output; private final Configuration conf; + private final InternalFileEncryptor fileEncryptor; - private DynMethods.BoundMethod flushPageStoreToWriter; + private ColumnChunkPageWriteStore pageStore = null; private ColumnWriteStore writeStore; private long recordCount = 0; private long nextCheckRecordCount = 10; private boolean closed; private ParquetFileWriter writer; + private int rowGroupOrdinal; private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; @@ -97,7 +81,8 @@ class ParquetWriter implements FileAppender, Closeable { CompressionCodecName codec, ParquetProperties properties, MetricsConfig metricsConfig, - ParquetFileWriter.Mode writeMode) { + ParquetFileWriter.Mode writeMode, + FileEncryptionProperties encryptionProperties) { this.targetRowGroupSize = rowGroupSize; this.props = properties; this.metadata = ImmutableMap.copyOf(metadata); @@ -110,6 +95,9 @@ class ParquetWriter implements FileAppender, Closeable { this.writeMode = writeMode; this.output = output; this.conf = conf; + this.rowGroupOrdinal = 0; + this.fileEncryptor = + (encryptionProperties == null ? null : new InternalFileEncryptor(encryptionProperties)); startRowGroup(); } @@ -119,7 +107,15 @@ private void ensureWriterInitialized() { try { this.writer = new ParquetFileWriter( - ParquetIO.file(output, conf), parquetSchema, writeMode, targetRowGroupSize, 0); + ParquetIO.file(output, conf), + parquetSchema, + writeMode, + targetRowGroupSize, + 0, + columnIndexTruncateLength, + ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, + fileEncryptor); } catch (IOException e) { throw new UncheckedIOException("Failed to create Parquet file", e); } @@ -213,7 +209,7 @@ private void flushRowGroup(boolean finished) { ensureWriterInitialized(); writer.startBlock(recordCount); writeStore.flush(); - flushPageStoreToWriter.invoke(writer); + pageStore.flushToFileWriter(writer); writer.endBlock(); if (!finished) { writeStore.close(); @@ -234,13 +230,18 @@ private void startRowGroup() { props.getMaxRowCountForPageSizeCheck()); this.recordCount = 0; - PageWriteStore pageStore = - pageStoreCtorParquet.newInstance( - compressor, parquetSchema, props.getAllocator(), this.columnIndexTruncateLength); + this.pageStore = + new ColumnChunkPageWriteStore( + compressor, + parquetSchema, + props.getAllocator(), + this.columnIndexTruncateLength, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, + fileEncryptor, + rowGroupOrdinal); + this.rowGroupOrdinal++; - this.flushPageStoreToWriter = flushToWriter.bind(pageStore); - this.writeStore = - props.newColumnWriteStore(parquetSchema, pageStore, (BloomFilterWriteStore) pageStore); + this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, pageStore); model.setColumnStore(writeStore); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 7bb89a30f8e9..da91e4dfa56a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -185,7 +186,13 @@ private Map generateOffsetToStartPos(Schema schema) { return null; } - try (ParquetFileReader fileReader = newReader(file, ParquetReadOptions.builder().build())) { + FileDecryptionProperties decryptionProperties = + (options == null) ? null : options.getDecryptionProperties(); + + ParquetReadOptions readOptions = + ParquetReadOptions.builder().withDecryption(decryptionProperties).build(); + + try (ParquetFileReader fileReader = newReader(file, readOptions)) { Map offsetToStartPos = Maps.newHashMap(); long curRowCount = 0; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java new file mode 100644 index 000000000000..32923c8424b2 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.Files.localOutput; +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; +import static org.apache.iceberg.types.Types.NestedField.optional; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestParquetEncryption { + + private static final String columnName = "intCol"; + private static final int recordCount = 100; + private static final ByteBuffer fileDek = ByteBuffer.allocate(16); + private static final ByteBuffer aadPrefix = ByteBuffer.allocate(16); + private static File file; + private static final Schema schema = new Schema(optional(1, columnName, IntegerType.get())); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void writeEncryptedFile() throws IOException { + List records = Lists.newArrayListWithCapacity(recordCount); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= recordCount; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(columnName, i); + records.add(record); + } + + SecureRandom rand = new SecureRandom(); + rand.nextBytes(fileDek.array()); + rand.nextBytes(aadPrefix.array()); + + file = createTempFile(temp); + + FileAppender writer = + Parquet.write(localOutput(file)) + .schema(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .build(); + + try (Closeable toClose = writer) { + writer.addAll(Lists.newArrayList(records.toArray(new GenericData.Record[] {}))); + } + } + + @Test + public void testReadEncryptedFileWithoutKeys() throws IOException { + TestHelpers.assertThrows( + "Decrypted without keys", + ParquetCryptoRuntimeException.class, + "Trying to read file with encrypted footer. No keys available", + () -> Parquet.read(localInput(file)).project(schema).callInit().build().iterator()); + } + + @Test + public void testReadEncryptedFileWithoutAADPrefix() throws IOException { + TestHelpers.assertThrows( + "Decrypted without AAD prefix", + ParquetCryptoRuntimeException.class, + "AAD prefix used for file encryption, " + + "but not stored in file and not supplied in decryption properties", + () -> + Parquet.read(localInput(file)) + .project(schema) + .withFileEncryptionKey(fileDek) + .callInit() + .build() + .iterator()); + } + + @Test + public void testReadEncryptedFile() throws IOException { + try (CloseableIterator readRecords = + Parquet.read(localInput(file)) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .project(schema) + .callInit() + .build() + .iterator()) { + for (int i = 1; i <= recordCount; i++) { + GenericData.Record readRecord = (GenericData.Record) readRecords.next(); + Assert.assertEquals(i, readRecord.get(columnName)); + } + } + } +} diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java index aeb18d43fd3d..93080e17db35 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java @@ -63,8 +63,7 @@ public void testMixedDictionaryNonDictionaryReads() throws IOException { Iterable dictionaryEncodableData = RandomData.generateDictionaryEncodableData( schema, 10000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE); - try (FileAppender writer = - getParquetWriter(schema, dictionaryEncodedFile)) { + try (FileAppender writer = parquetWriter(schema, dictionaryEncodedFile)) { writer.addAll(dictionaryEncodableData); } @@ -72,7 +71,7 @@ public void testMixedDictionaryNonDictionaryReads() throws IOException { Assert.assertTrue("Delete should succeed", plainEncodingFile.delete()); Iterable nonDictionaryData = RandomData.generate(schema, 10000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE); - try (FileAppender writer = getParquetWriter(schema, plainEncodingFile)) { + try (FileAppender writer = parquetWriter(schema, plainEncodingFile)) { writer.addAll(nonDictionaryData); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java index 42ea34936b5f..f8b2040c4512 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryFallbackToPlainEncodingVectorizedReads.java @@ -54,8 +54,7 @@ Iterable generateData( } @Override - FileAppender getParquetWriter(Schema schema, File testFile) - throws IOException { + FileAppender parquetWriter(Schema schema, File testFile) throws IOException { return Parquet.write(Files.localOutput(testFile)) .schema(schema) .named("test") diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java index b3f80367c7b5..53099eefa40c 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java @@ -23,6 +23,8 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; import java.util.Iterator; import org.apache.avro.generic.GenericData; import org.apache.iceberg.AssertHelpers; @@ -54,8 +56,10 @@ public class TestParquetVectorizedReads extends AvroDataTest { private static final int NUM_ROWS = 200_000; - static final int BATCH_SIZE = 10_000; + private static final ByteBuffer fileDek = ByteBuffer.allocate(16); + private static final ByteBuffer aadPrefix = ByteBuffer.allocate(16); + static final int BATCH_SIZE = 10_000; static final Function IDENTITY = record -> record; @Override @@ -94,10 +98,18 @@ private void writeAndValidate( File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); - try (FileAppender writer = getParquetWriter(schema, testFile)) { + try (FileAppender writer = parquetWriter(schema, testFile)) { writer.addAll(expected); } assertRecordsMatch(schema, numRecords, expected, testFile, reuseContainers, batchSize); + + // With encryption + testFile.delete(); + try (FileAppender writer = encryptedParquetWriter(schema, testFile)) { + writer.addAll(expected); + } + + assertRecordsMatch(schema, numRecords, expected, testFile, reuseContainers, batchSize, true); } protected int getNumRows() { @@ -115,12 +127,24 @@ Iterable generateData( return transform == IDENTITY ? data : Iterables.transform(data, transform); } - FileAppender getParquetWriter(Schema schema, File testFile) - throws IOException { + FileAppender parquetWriter(Schema schema, File testFile) throws IOException { return Parquet.write(Files.localOutput(testFile)).schema(schema).named("test").build(); } - FileAppender getParquetV2Writer(Schema schema, File testFile) + FileAppender encryptedParquetWriter(Schema schema, File testFile) + throws IOException { + SecureRandom rand = new SecureRandom(); + rand.nextBytes(fileDek.array()); + rand.nextBytes(aadPrefix.array()); + return Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .named("test") + .build(); + } + + FileAppender parquetV2Writer(Schema schema, File testFile) throws IOException { return Parquet.write(Files.localOutput(testFile)) .schema(schema) @@ -129,6 +153,20 @@ FileAppender getParquetV2Writer(Schema schema, File testFile .build(); } + FileAppender encryptedParquetV2Writer(Schema schema, File testFile) + throws IOException { + SecureRandom rand = new SecureRandom(); + rand.nextBytes(fileDek.array()); + rand.nextBytes(aadPrefix.array()); + return Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .withFileEncryptionKey(fileDek) + .withAADPrefix(aadPrefix) + .named("test") + .writerVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .build(); + } + void assertRecordsMatch( Schema schema, int expectedSize, @@ -137,6 +175,18 @@ void assertRecordsMatch( boolean reuseContainers, int batchSize) throws IOException { + assertRecordsMatch(schema, expectedSize, expected, testFile, reuseContainers, batchSize, false); + } + + void assertRecordsMatch( + Schema schema, + int expectedSize, + Iterable expected, + File testFile, + boolean reuseContainers, + int batchSize, + boolean encrypted) + throws IOException { Parquet.ReadBuilder readBuilder = Parquet.read(Files.localInput(testFile)) .project(schema) @@ -148,6 +198,12 @@ void assertRecordsMatch( if (reuseContainers) { readBuilder.reuseContainers(); } + + if (encrypted) { + readBuilder.withFileEncryptionKey(fileDek); + readBuilder.withAADPrefix(aadPrefix); + } + try (CloseableIterable batchReader = readBuilder.build()) { Iterator expectedIter = expected.iterator(); Iterator batches = batchReader.iterator(); @@ -279,7 +335,7 @@ public void testReadsForTypePromotedColumns() throws Exception { Assert.assertTrue("Delete should succeed", dataFile.delete()); Iterable data = generateData(writeSchema, 30000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY); - try (FileAppender writer = getParquetWriter(writeSchema, dataFile)) { + try (FileAppender writer = parquetWriter(writeSchema, dataFile)) { writer.addAll(data); } @@ -308,10 +364,18 @@ public void testSupportedReadsForParquetV2() throws Exception { Assert.assertTrue("Delete should succeed", dataFile.delete()); Iterable data = generateData(schema, 30000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY); - try (FileAppender writer = getParquetV2Writer(schema, dataFile)) { + try (FileAppender writer = parquetV2Writer(schema, dataFile)) { writer.addAll(data); } assertRecordsMatch(schema, 30000, data, dataFile, true, BATCH_SIZE); + + // With encryption + dataFile.delete(); + try (FileAppender writer = encryptedParquetV2Writer(schema, dataFile)) { + writer.addAll(data); + } + + assertRecordsMatch(schema, 30000, data, dataFile, true, BATCH_SIZE, true); } @Test @@ -323,7 +387,7 @@ public void testUnsupportedReadsForParquetV2() throws Exception { Assert.assertTrue("Delete should succeed", dataFile.delete()); Iterable data = generateData(schema, 30000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY); - try (FileAppender writer = getParquetV2Writer(schema, dataFile)) { + try (FileAppender writer = parquetV2Writer(schema, dataFile)) { writer.addAll(data); } AssertHelpers.assertThrows(