Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.parquet;

import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.types.Types.NestedField.optional;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assert;
import org.junit.Test;

public class TestParquetEncryptionWithWriteSupport extends DataTest {
private static final ByteBuffer fileDek = ByteBuffer.allocate(16);
private static final ByteBuffer aadPrefix = ByteBuffer.allocate(16);

@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expected = RandomGenericData.generate(schema, 100, 0L);

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

SecureRandom rand = new SecureRandom();
rand.nextBytes(fileDek.array());
rand.nextBytes(aadPrefix.array());

try (FileAppender<Record> appender =
Parquet.write(Files.localOutput(testFile))
.schema(schema)
.withFileEncryptionKey(fileDek)
.withAADPrefix(aadPrefix)
.createWriterFunc(GenericParquetWriter::buildWriter)
.build()) {
appender.addAll(expected);
}

Assert.assertThrows(
"Decrypted without keys",
ParquetCryptoRuntimeException.class,
() ->
Parquet.read(localInput(testFile))
.project(schema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()
.iterator());

List<Record> rows;
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.withFileEncryptionKey(fileDek)
.withAADPrefix(aadPrefix)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
rows = Lists.newArrayList(reader);
}

for (int i = 0; i < expected.size(); i += 1) {
DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i));
}

// test reuseContainers
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.withFileEncryptionKey(fileDek)
.withAADPrefix(aadPrefix)
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
CloseableIterator it = reader.iterator();
int idx = 0;
while (it.hasNext()) {
GenericRecord actualRecord = (GenericRecord) it.next();
DataTestHelpers.assertEquals(schema.asStruct(), expected.get(idx), actualRecord);
idx++;
}
}
}

@Test
public void testTwoLevelList() throws IOException {
Schema schema =
new Schema(
optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())),
optional(2, "topbytes", Types.BinaryType.get()));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

SecureRandom rand = new SecureRandom();
rand.nextBytes(fileDek.array());
rand.nextBytes(aadPrefix.array());
FileEncryptionProperties fileEncryptionProperties =
FileEncryptionProperties.builder(fileDek.array()).withAADPrefix(aadPrefix.array()).build();

ParquetWriter<org.apache.avro.generic.GenericRecord> writer =
AvroParquetWriter.<org.apache.avro.generic.GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.withEncryption(fileEncryptionProperties)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = new ArrayList();
byte[] expectedByte = {0x00, 0x01};
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte);
expectedByteList.add(expectedBinary);
recordBuilder.set("arraybytes", expectedByteList);
recordBuilder.set("topbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

writer.write(expectedRecord);
writer.close();

// test reuseContainers
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.withFileEncryptionKey(fileDek)
.withAADPrefix(aadPrefix)
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
CloseableIterator it = reader.iterator();
Assert.assertTrue("Should have at least one row", it.hasNext());
while (it.hasNext()) {
GenericRecord actualRecord = (GenericRecord) it.next();
Assert.assertEquals(actualRecord.get(0, ArrayList.class).get(0), expectedBinary);
Assert.assertEquals(actualRecord.get(1, ByteBuffer.class), expectedBinary);
Assert.assertFalse("Should not have more than one row", it.hasNext());
}
}
}
}
97 changes: 93 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -89,13 +90,16 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetOutputFormat;
Expand All @@ -113,7 +117,8 @@ private Parquet() {}
Sets.newHashSet(
"parquet.read.filter",
"parquet.private.read.filter.predicate",
"parquet.read.support.class");
"parquet.read.support.class",
"parquet.crypto.factory.class");

public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
Expand All @@ -132,6 +137,8 @@ public static class WriteBuilder {
private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
private Function<Map<String, String>, Context> createContextFunc = Context::dataContext;
private ByteBuffer fileEncryptionKey = null;
private ByteBuffer fileAADPrefix = null;

private WriteBuilder(OutputFile file) {
this.file = file;
Expand Down Expand Up @@ -204,6 +211,16 @@ public WriteBuilder writerVersion(WriterVersion version) {
return this;
}

public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
this.fileEncryptionKey = encryptionKey;
return this;
}

public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
this.fileAADPrefix = aadPrefix;
return this;
}

@SuppressWarnings("unchecked")
private <T> WriteSupport<T> getWriteSupport(MessageType type) {
if (writeSupport != null) {
Expand Down Expand Up @@ -275,6 +292,20 @@ public <D> FileAppender<D> build() throws IOException {
set("parquet.avro.write-old-list-structure", "false");
MessageType type = ParquetSchemaUtil.convert(schema, name);

FileEncryptionProperties fileEncryptionProperties = null;
if (fileEncryptionKey != null) {
byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);

fileEncryptionProperties =
FileEncryptionProperties.builder(encryptionKeyArray)
.withAADPrefix(aadPrefixArray)
.withoutAADPrefixStorage()
.build();
} else {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
}

if (createWriterFunc != null) {
Preconditions.checkArgument(
writeSupport == null, "Cannot write with both write support and Parquet value writer");
Expand Down Expand Up @@ -312,7 +343,8 @@ public <D> FileAppender<D> build() throws IOException {
codec,
parquetProperties,
metricsConfig,
writeMode);
writeMode,
fileEncryptionProperties);
} else {
ParquetWriteBuilder<D> parquetWriteBuilder =
new ParquetWriteBuilder<D>(ParquetIO.file(file))
Expand All @@ -327,7 +359,8 @@ public <D> FileAppender<D> build() throws IOException {
.withPageSize(pageSize)
.withPageRowCountLimit(pageRowLimit)
.withDictionaryEncoding(dictionaryEnabled)
.withDictionaryPageSize(dictionaryPageSize);
.withDictionaryPageSize(dictionaryPageSize)
.withEncryption(fileEncryptionProperties);

for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
String colPath = entry.getKey();
Expand Down Expand Up @@ -658,6 +691,16 @@ public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
return this;
}

public DataWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) {
appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
return this;
}

public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
appenderBuilder.withAADPrefix(aadPrefix);
return this;
}

public DataWriteBuilder withSortOrder(SortOrder newSortOrder) {
this.sortOrder = newSortOrder;
return this;
Expand Down Expand Up @@ -759,6 +802,16 @@ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
return this;
}

public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer fileEncryptionKey) {
appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
return this;
}

public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
appenderBuilder.withAADPrefix(aadPrefix);
return this;
}

public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
return this;
Expand Down Expand Up @@ -930,6 +983,8 @@ public static class ReadBuilder {
private boolean reuseContainers = false;
private int maxRecordsPerBatch = 10000;
private NameMapping nameMapping = null;
private ByteBuffer fileEncryptionKey = null;
private ByteBuffer fileAADPrefix = null;

private ReadBuilder(InputFile file) {
this.file = file;
Expand Down Expand Up @@ -1019,8 +1074,31 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
return this;
}

public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
this.fileEncryptionKey = encryptionKey;
return this;
}

public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
this.fileAADPrefix = aadPrefix;
return this;
}

@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
fileDecryptionProperties =
FileDecryptionProperties.builder()
.withFooterKey(encryptionKeyArray)
.withAADPrefix(aadPrefixArray)
.build();
} else {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
}

if (readerFunc != null || batchedReaderFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
Expand All @@ -1046,6 +1124,10 @@ public <D> CloseableIterable<D> build() {
optionsBuilder.withRange(start, start + length);
}

if (fileDecryptionProperties != null) {
optionsBuilder.withDecryption(fileDecryptionProperties);
}

ParquetReadOptions options = optionsBuilder.build();

if (batchedReaderFunc != null) {
Expand Down Expand Up @@ -1097,8 +1179,11 @@ public <D> CloseableIterable<D> build() {
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
ParquetReadOptions decryptOptions =
ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
MessageType type;
try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) {
try (ParquetFileReader schemaReader =
ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
Expand Down Expand Up @@ -1131,6 +1216,10 @@ public <D> CloseableIterable<D> build() {
builder.withNameMapping(nameMapping);
}

if (fileDecryptionProperties != null) {
builder.withDecryption(fileDecryptionProperties);
}

return new ParquetIterable<>(builder);
}
}
Expand Down
Loading