From 80ff9d5af6fc8e9813e4484f3be89e603bd0ffbf Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 21 Dec 2023 15:49:17 +0200 Subject: [PATCH 1/2] 6762 for main branch --- .../java/org/apache/iceberg/avro/Avro.java | 22 ++++++++++++ .../iceberg/encryption/EncryptionUtil.java | 13 ++++--- .../encryption/StandardEncryptionManager.java | 17 +++++++-- .../encryption/StandardKeyMetadata.java | 8 ++--- .../iceberg/io/FileAppenderFactory.java | 11 ++++++ .../iceberg/data/BaseFileWriterFactory.java | 22 +++++------- .../org/apache/iceberg/data/DeleteFilter.java | 7 ++++ .../iceberg/data/GenericAppenderFactory.java | 29 +++++++++------ .../apache/iceberg/data/GenericReader.java | 7 ++++ .../mr/mapreduce/IcebergInputFormat.java | 14 +++++--- .../main/java/org/apache/iceberg/orc/ORC.java | 22 ++++++++++++ .../org/apache/iceberg/parquet/Parquet.java | 35 +++++++++++++++++++ .../iceberg/spark/source/BaseBatchReader.java | 30 ++++++++++++++-- .../iceberg/spark/source/BaseRowReader.java | 27 ++++++++++++-- .../iceberg/spark/source/BatchDataReader.java | 1 + .../spark/source/ChangelogRowReader.java | 1 + .../source/PositionDeletesRowReader.java | 1 + .../iceberg/spark/source/RowDataReader.java | 1 + .../spark/source/SparkAppenderFactory.java | 22 +++++++----- 19 files changed, 237 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 85cc8d902026..1c50515c8155 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -55,6 +55,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -91,6 +92,13 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Currenty, encryption of data files in Avro format is not supported"); + return new WriteBuilder(file.encryptingOutputFile()); + } + public static class WriteBuilder { private final OutputFile file; private final Map config = Maps.newHashMap(); @@ -272,6 +280,13 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Currenty, encryption of data files in Avro format is not supported"); + return new DataWriteBuilder(file.encryptingOutputFile()); + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -368,6 +383,13 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Currenty, encryption of delete files in Avro format is not supported"); + return new DeleteWriteBuilder(file.encryptingOutputFile()); + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index ad1deecb8da1..a538a865f0a2 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -23,6 +23,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.PropertyUtil; @@ -86,10 +87,7 @@ public static EncryptionManager createEncryptionManager( TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); - if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { - throw new UnsupportedOperationException( - "Iceberg encryption currently supports only parquet format for data files"); - } + boolean nativeDataEncryption = (FileFormat.fromString(fileFormat) == FileFormat.PARQUET); int dataKeyLength = PropertyUtil.propertyAsInt( @@ -102,6 +100,11 @@ public static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + return new StandardEncryptionManager( + tableKeyId, dataKeyLength, kmsClient, nativeDataEncryption); + } + + public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 63f89e7661b3..612bd20119b4 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -32,6 +32,7 @@ public class StandardEncryptionManager implements EncryptionManager { private final transient KeyManagementClient kmsClient; private final String tableKeyId; private final int dataKeyLength; + private final boolean nativeDataEncryption; private transient volatile SecureRandom lazyRNG = null; @@ -41,7 +42,10 @@ public class StandardEncryptionManager implements EncryptionManager { * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption */ public StandardEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + String tableKeyId, + int dataKeyLength, + KeyManagementClient kmsClient, + boolean nativeDataEncryption) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -51,6 +55,7 @@ public StandardEncryptionManager( this.tableKeyId = tableKeyId; this.kmsClient = kmsClient; this.dataKeyLength = dataKeyLength; + this.nativeDataEncryption = nativeDataEncryption; } @Override @@ -67,7 +72,15 @@ public InputFile decrypt(EncryptedInputFile encrypted) { @Override public Iterable decrypt(Iterable encrypted) { // Bulk decrypt is only applied to data files. Returning source input files for parquet. - return Iterables.transform(encrypted, this::decrypt); + if (nativeDataEncryption) { + return Iterables.transform(encrypted, this::getSourceFile); + } else { + return Iterables.transform(encrypted, this::decrypt); + } + } + + private InputFile getSourceFile(EncryptedInputFile encryptedFile) { + return encryptedFile.encryptedInputFile(); } private SecureRandom workerRNG() { diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 2ac70aebc316..964bf2f03a45 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; -class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord { +public class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord { private static final byte V1 = 1; private static final Schema SCHEMA_V1 = new Schema( @@ -73,11 +73,11 @@ static Map supportedAvroSchemaVersions() { return avroSchemaVersions; } - ByteBuffer encryptionKey() { + public ByteBuffer encryptionKey() { return encryptionKey; } - ByteBuffer aadPrefix() { + public ByteBuffer aadPrefix() { return aadPrefix; } @@ -95,7 +95,7 @@ static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { return parse(kmBuffer); } - static StandardKeyMetadata parse(ByteBuffer buffer) { + public static StandardKeyMetadata parse(ByteBuffer buffer) { try { return KEY_METADATA_DECODER.decode(buffer); } catch (IOException e) { diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 59b0b4b3bf6a..1b320cae64e0 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -40,6 +40,17 @@ public interface FileAppenderFactory { */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + /** + * Create a new {@link FileAppender}. + * + * @param outputFile an EncryptedOutputFile used to create an output stream. + * @param fileFormat File format. + * @return a newly created {@link FileAppender} + */ + default FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + return newAppender(outputFile.encryptingOutputFile(), fileFormat); + } + /** * Create a new {@link DataWriter}. * diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 976b98b0a9fe..9e37c723be93 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -35,7 +35,6 @@ import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.FileWriterFactory; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -93,7 +92,6 @@ protected BaseFileWriterFactory( @Override public DataWriter newDataWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); @@ -102,7 +100,7 @@ public DataWriter newDataWriter( switch (dataFileFormat) { case AVRO: Avro.DataWriteBuilder avroBuilder = - Avro.writeData(outputFile) + Avro.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -118,7 +116,7 @@ public DataWriter newDataWriter( case PARQUET: Parquet.DataWriteBuilder parquetBuilder = - Parquet.writeData(outputFile) + Parquet.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -134,7 +132,7 @@ public DataWriter newDataWriter( case ORC: ORC.DataWriteBuilder orcBuilder = - ORC.writeData(outputFile) + ORC.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -160,7 +158,6 @@ public DataWriter newDataWriter( @Override public EqualityDeleteWriter newEqualityDeleteWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forTable(table); @@ -169,7 +166,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( switch (deleteFileFormat) { case AVRO: Avro.DeleteWriteBuilder avroBuilder = - Avro.writeDeletes(outputFile) + Avro.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -186,7 +183,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( case PARQUET: Parquet.DeleteWriteBuilder parquetBuilder = - Parquet.writeDeletes(outputFile) + Parquet.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -203,7 +200,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( case ORC: ORC.DeleteWriteBuilder orcBuilder = - ORC.writeDeletes(outputFile) + ORC.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -230,7 +227,6 @@ public EqualityDeleteWriter newEqualityDeleteWriter( @Override public PositionDeleteWriter newPositionDeleteWriter( EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { - OutputFile outputFile = file.encryptingOutputFile(); EncryptionKeyMetadata keyMetadata = file.keyMetadata(); Map properties = table.properties(); MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); @@ -239,7 +235,7 @@ public PositionDeleteWriter newPositionDeleteWriter( switch (deleteFileFormat) { case AVRO: Avro.DeleteWriteBuilder avroBuilder = - Avro.writeDeletes(outputFile) + Avro.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) @@ -254,7 +250,7 @@ public PositionDeleteWriter newPositionDeleteWriter( case PARQUET: Parquet.DeleteWriteBuilder parquetBuilder = - Parquet.writeDeletes(outputFile) + Parquet.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) @@ -269,7 +265,7 @@ public PositionDeleteWriter newPositionDeleteWriter( case ORC: ORC.DeleteWriteBuilder orcBuilder = - ORC.writeDeletes(outputFile) + ORC.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 55acd3200894..a6f6bf81b8b7 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -36,6 +36,7 @@ import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -281,6 +282,12 @@ private CloseableIterable openDeletes(DeleteFile deleteFile, Schema dele builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); } + if (deleteFile.keyMetadata() != null) { + StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(deleteFile.keyMetadata()); + builder.withFileEncryptionKey(keyMetadata.encryptionKey()); + builder.withAADPrefix(keyMetadata.aadPrefix()); + } + return builder.build(); case ORC: diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..abad5d261d51 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -33,6 +33,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -84,11 +85,17 @@ public GenericAppenderFactory setAll(Map properties) { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), fileFormat); + } + + @Override + public FileAppender newAppender( + EncryptedOutputFile encryptedOutputFile, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (fileFormat) { case AVRO: - return Avro.write(outputFile) + return Avro.write(encryptedOutputFile) .schema(schema) .createWriterFunc(DataWriter::create) .metricsConfig(metricsConfig) @@ -97,7 +104,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); case PARQUET: - return Parquet.write(outputFile) + return Parquet.write(encryptedOutputFile) .schema(schema) .createWriterFunc(GenericParquetWriter::buildWriter) .setAll(config) @@ -106,7 +113,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); case ORC: - return ORC.write(outputFile) + return ORC.write(encryptedOutputFile) .schema(schema) .createWriterFunc(GenericOrcWriter::buildWriter) .setAll(config) @@ -127,7 +134,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo public org.apache.iceberg.io.DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new org.apache.iceberg.io.DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -146,10 +153,11 @@ public EqualityDeleteWriter newEqDeleteWriter( "Equality delete row schema shouldn't be null when creating equality-delete writer"); MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { switch (format) { case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(DataWriter::create) .withPartition(partition) .overwrite() @@ -161,7 +169,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(GenericOrcWriter::buildWriter) .withPartition(partition) .overwrite() @@ -174,7 +182,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc(GenericParquetWriter::buildWriter) .withPartition(partition) .overwrite() @@ -199,10 +207,11 @@ public EqualityDeleteWriter newEqDeleteWriter( public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { switch (format) { case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(DataWriter::create) .withPartition(partition) .overwrite() @@ -213,7 +222,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(GenericOrcWriter::buildWriter) .withPartition(partition) .overwrite() @@ -224,7 +233,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc(GenericParquetWriter::buildWriter) .withPartition(partition) .overwrite() diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index bf2919f334a8..f7354f31782e 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -29,6 +29,7 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -126,6 +127,12 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject parquet.reuseContainers(); } + if (task.file().keyMetadata() != null) { + StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(task.file().keyMetadata()); + parquet.withFileEncryptionKey(keyMetadata.encryptionKey()); + parquet.withAADPrefix(keyMetadata.aadPrefix()); + } + return parquet.build(); case ORC: diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index a95454b8b0ee..72480b212bf7 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -56,8 +56,8 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -302,10 +302,7 @@ public void close() throws IOException { private CloseableIterable openTask(FileScanTask currentTask, Schema readSchema) { DataFile file = currentTask.file(); - InputFile inputFile = - encryptionManager.decrypt( - EncryptedFiles.encryptedInput( - io.newInputFile(file.path().toString()), file.keyMetadata())); + InputFile inputFile = io.newInputFile(file.path().toString()); CloseableIterable iterable; switch (file.format()) { @@ -419,6 +416,13 @@ private CloseableIterable newParquetIterable( if (nameMapping != null) { parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + + if (task.file().keyMetadata() != null) { + StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(task.file().keyMetadata()); + parquetReadBuilder.withFileEncryptionKey(keyMetadata.encryptionKey()); + parquetReadBuilder.withAADPrefix(keyMetadata.aadPrefix()); + } + parquetReadBuilder.createReaderFunc( fileSchema -> GenericParquetReaders.buildReader( diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 89cd1ad4362a..0b25cdabf499 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -63,6 +63,7 @@ import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; @@ -101,6 +102,13 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Currenty, encryption of data files in ORC format is not supported"); + return new WriteBuilder(file.encryptingOutputFile()); + } + public static class WriteBuilder { private final OutputFile file; private final Configuration conf; @@ -382,6 +390,13 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Currenty, encryption of data files in ORC format is not supported"); + return new DataWriteBuilder(file.encryptingOutputFile()); + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -479,6 +494,13 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + Preconditions.checkState( + file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, + "Currenty, encryption of delete files in ORC format is not supported"); + return new DeleteWriteBuilder(file.encryptingOutputFile()); + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d240c84b9e4d..5b5ffa858186 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -71,7 +71,9 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; @@ -125,6 +127,17 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + if (file.keyMetadata() instanceof StandardKeyMetadata) { + StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata(); + return write(file.plainOutputFile()) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()); + } else { + return write(file.encryptingOutputFile()); + } + } + public static class WriteBuilder { private final OutputFile file; private final Configuration conf; @@ -608,6 +621,17 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + if (file.keyMetadata() instanceof StandardKeyMetadata) { + StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata(); + return writeData(file.plainOutputFile()) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()); + } else { + return writeData(file.encryptingOutputFile()); + } + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -715,6 +739,17 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + if (file.keyMetadata() instanceof StandardKeyMetadata) { + StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata(); + return writeDeletes(file.plainOutputFile()) + .withFileEncryptionKey(keyMetadata.encryptionKey()) + .withAADPrefix(keyMetadata.aadPrefix()); + } else { + return writeDeletes(file.encryptingOutputFile()); + } + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index c05b694a60dc..135705fe42b3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.nio.ByteBuffer; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -26,11 +27,15 @@ import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -53,6 +58,7 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( InputFile inputFile, + ByteBuffer keyMetadata, FileFormat format, long start, long length, @@ -61,9 +67,17 @@ protected CloseableIterable newBatchIterable( SparkDeleteFilter deleteFilter) { switch (format) { case PARQUET: - return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); + return newParquetIterable( + EncryptedFiles.encryptedInput(inputFile, keyMetadata), + start, + length, + residual, + idToConstant, + deleteFilter); case ORC: + Preconditions.checkState( + keyMetadata == null, "Encryption currently not supported with ORC format"); return newOrcIterable(inputFile, start, length, residual, idToConstant); default: @@ -73,7 +87,7 @@ protected CloseableIterable newBatchIterable( } private CloseableIterable newParquetIterable( - InputFile inputFile, + EncryptedInputFile inputFile, long start, long length, Expression residual, @@ -82,7 +96,15 @@ private CloseableIterable newParquetIterable( // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - return Parquet.read(inputFile) + ByteBuffer fileEncryptionKey = null; + ByteBuffer aadPrefix = null; + if (inputFile.keyMetadata() != null && inputFile.keyMetadata().buffer() != null) { + StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(inputFile.keyMetadata().buffer()); + fileEncryptionKey = keyMetadata.encryptionKey(); + aadPrefix = keyMetadata.aadPrefix(); + } + + return Parquet.read(inputFile.encryptedInputFile()) .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( @@ -97,6 +119,8 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .withFileEncryptionKey(fileEncryptionKey) + .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 927084caea1c..68ad06a7b1ee 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -26,6 +27,9 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -50,6 +54,7 @@ abstract class BaseRowReader extends BaseReader newIterable( InputFile file, + ByteBuffer encryptionKeyMetadata, FileFormat format, long start, long length, @@ -58,7 +63,13 @@ protected CloseableIterable newIterable( Map idToConstant) { switch (format) { case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); + return newParquetIterable( + EncryptedFiles.encryptedInput(file, encryptionKeyMetadata), + start, + length, + residual, + projection, + idToConstant); case AVRO: return newAvroIterable(file, start, length, projection, idToConstant); @@ -83,13 +94,21 @@ private CloseableIterable newAvroIterable( } private CloseableIterable newParquetIterable( - InputFile file, + EncryptedInputFile inputFile, long start, long length, Expression residual, Schema readSchema, Map idToConstant) { - return Parquet.read(file) + ByteBuffer fileEncryptionKey = null; + ByteBuffer aadPrefix = null; + if (inputFile.keyMetadata() != null && inputFile.keyMetadata().buffer() != null) { + StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(inputFile.keyMetadata().buffer()); + fileEncryptionKey = keyMetadata.encryptionKey(); + aadPrefix = keyMetadata.aadPrefix(); + } + + return Parquet.read(inputFile.encryptedInputFile()) .reuseContainers() .split(start, length) .project(readSchema) @@ -98,6 +117,8 @@ private CloseableIterable newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) + .withFileEncryptionKey(fileEncryptionKey) + .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 389ad1d5a2d9..4e6e14b76d12 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -100,6 +100,7 @@ protected CloseableIterator open(FileScanTask task) { return newBatchIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 572f955884a3..307824028e81 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -133,6 +133,7 @@ private CloseableIterable rows(ContentScanTask task, Sche Preconditions.checkNotNull(location, "Could not find InputFile"); return newIterable( location, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 4b847474153c..ad034d744dc6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -92,6 +92,7 @@ protected CloseableIterator open(PositionDeletesScanTask task) { return newIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 9356f62f3593..0b11b3332b37 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -105,6 +105,7 @@ protected CloseableIterable open( inputFile, "Could not find InputFile associated with FileScanTask"); return newIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 6372edde0782..8557ce331b3d 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -31,6 +31,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -161,7 +162,12 @@ private StructType lazyPosDeleteSparkType() { } @Override - public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { @@ -203,7 +209,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -224,7 +230,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType)) .overwrite() @@ -236,7 +242,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(ignored -> new SparkAvroWriter(lazyEqDeleteSparkType())) .overwrite() .rowSchema(eqDeleteRowSchema) @@ -247,7 +253,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .overwrite() .rowSchema(eqDeleteRowSchema) @@ -274,7 +280,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: StructType sparkPosDeleteSchema = SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType)) .overwrite() @@ -286,7 +292,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(ignored -> new SparkAvroWriter(lazyPosDeleteSparkType())) .overwrite() .rowSchema(posDeleteRowSchema) @@ -296,7 +302,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .overwrite() .rowSchema(posDeleteRowSchema) From a2a56423fcb27ed3c6200269ae7c82a87a724625 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 3 Jan 2024 13:31:07 -0800 Subject: [PATCH 2/2] Add native subclasses for InputFile and OutputFile to simplify. --- .../encryption/EncryptedOutputFile.java | 5 --- .../java/org/apache/iceberg/avro/Avro.java | 6 +-- .../iceberg/encryption/EncryptionUtil.java | 12 +----- .../encryption/NativeEncryptionInputFile.java | 27 ++++++++++++ .../NativeEncryptionKeyMetadata.java | 30 +++++++++++++ .../NativeEncryptionOutputFile.java | 30 +++++++++++++ .../encryption/StandardEncryptionManager.java | 34 ++++++--------- .../encryption/StandardKeyMetadata.java | 6 ++- .../org/apache/iceberg/data/DeleteFilter.java | 7 ---- .../apache/iceberg/data/GenericReader.java | 7 ---- .../flink/sink/FlinkAppenderFactory.java | 20 +++++---- .../mr/mapreduce/IcebergInputFormat.java | 14 +++---- .../main/java/org/apache/iceberg/orc/ORC.java | 14 ++++--- .../org/apache/iceberg/parquet/Parquet.java | 42 +++++++++++-------- .../iceberg/spark/source/BaseBatchReader.java | 30 ++----------- .../iceberg/spark/source/BaseRowReader.java | 27 ++---------- .../iceberg/spark/source/BatchDataReader.java | 1 - .../spark/source/ChangelogRowReader.java | 1 - .../source/PositionDeletesRowReader.java | 1 - .../iceberg/spark/source/RowDataReader.java | 1 - .../spark/source/SparkAppenderFactory.java | 4 +- 21 files changed, 167 insertions(+), 152 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java create mode 100644 core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java create mode 100644 core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 300c88d18862..1686342c776d 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -37,9 +37,4 @@ public interface EncryptedOutputFile { * #encryptingOutputFile()}. */ EncryptionKeyMetadata keyMetadata(); - - /** Underlying output file for native encryption. */ - default OutputFile plainOutputFile() { - throw new UnsupportedOperationException("Not implemented"); - } } diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 1c50515c8155..b10fa09c82a0 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -95,7 +95,7 @@ public static WriteBuilder write(OutputFile file) { public static WriteBuilder write(EncryptedOutputFile file) { Preconditions.checkState( file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, - "Currenty, encryption of data files in Avro format is not supported"); + "Avro encryption is not supported"); return new WriteBuilder(file.encryptingOutputFile()); } @@ -283,7 +283,7 @@ public static DataWriteBuilder writeData(OutputFile file) { public static DataWriteBuilder writeData(EncryptedOutputFile file) { Preconditions.checkState( file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, - "Currenty, encryption of data files in Avro format is not supported"); + "Avro encryption is not supported"); return new DataWriteBuilder(file.encryptingOutputFile()); } @@ -386,7 +386,7 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { Preconditions.checkState( file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, - "Currenty, encryption of delete files in Avro format is not supported"); + "Avro encryption is not supported"); return new DeleteWriteBuilder(file.encryptingOutputFile()); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index a538a865f0a2..e2cf98bf767f 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -20,7 +20,6 @@ import java.util.Map; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableProperties; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.io.OutputFile; @@ -81,14 +80,6 @@ public static EncryptionManager createEncryptionManager( return PlaintextEncryptionManager.instance(); } - String fileFormat = - PropertyUtil.propertyAsString( - tableProperties, - TableProperties.DEFAULT_FILE_FORMAT, - TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); - - boolean nativeDataEncryption = (FileFormat.fromString(fileFormat) == FileFormat.PARQUET); - int dataKeyLength = PropertyUtil.propertyAsInt( tableProperties, @@ -100,8 +91,7 @@ public static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager( - tableKeyId, dataKeyLength, kmsClient, nativeDataEncryption); + return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java new file mode 100644 index 000000000000..15ead6bac2b9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionInputFile.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import org.apache.iceberg.io.InputFile; + +/** An {@link EncryptedInputFile} that can be used for format-native encryption. */ +public interface NativeEncryptionInputFile extends EncryptedInputFile, InputFile { + @Override + NativeEncryptionKeyMetadata keyMetadata(); +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java new file mode 100644 index 000000000000..c2ed9d564d1e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionKeyMetadata.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; + +/** {@link EncryptionKeyMetadata} for use with format-native encryption. */ +public interface NativeEncryptionKeyMetadata extends EncryptionKeyMetadata { + /** Encryption key as a {@link ByteBuffer} */ + ByteBuffer encryptionKey(); + + /** Additional authentication data as a {@link ByteBuffer} */ + ByteBuffer aadPrefix(); +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java new file mode 100644 index 000000000000..0d0d5da8a677 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/NativeEncryptionOutputFile.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import org.apache.iceberg.io.OutputFile; + +/** An {@link EncryptedOutputFile} that can be used for format-native encryption. */ +public interface NativeEncryptionOutputFile extends EncryptedOutputFile { + @Override + NativeEncryptionKeyMetadata keyMetadata(); + + /** An {@link OutputFile} instance for the underlying (plaintext) output stream. */ + OutputFile plainOutputFile(); +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index 612bd20119b4..185f4d6f81bb 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -32,7 +32,6 @@ public class StandardEncryptionManager implements EncryptionManager { private final transient KeyManagementClient kmsClient; private final String tableKeyId; private final int dataKeyLength; - private final boolean nativeDataEncryption; private transient volatile SecureRandom lazyRNG = null; @@ -42,10 +41,7 @@ public class StandardEncryptionManager implements EncryptionManager { * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption */ public StandardEncryptionManager( - String tableKeyId, - int dataKeyLength, - KeyManagementClient kmsClient, - boolean nativeDataEncryption) { + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -55,32 +51,22 @@ public StandardEncryptionManager( this.tableKeyId = tableKeyId; this.kmsClient = kmsClient; this.dataKeyLength = dataKeyLength; - this.nativeDataEncryption = nativeDataEncryption; } @Override - public EncryptedOutputFile encrypt(OutputFile plainOutput) { + public NativeEncryptionOutputFile encrypt(OutputFile plainOutput) { return new StandardEncryptedOutputFile(plainOutput, dataKeyLength); } @Override - public InputFile decrypt(EncryptedInputFile encrypted) { + public NativeEncryptionInputFile decrypt(EncryptedInputFile encrypted) { // this input file will lazily parse key metadata in case the file is not an AES GCM stream. return new StandardDecryptedInputFile(encrypted); } @Override public Iterable decrypt(Iterable encrypted) { - // Bulk decrypt is only applied to data files. Returning source input files for parquet. - if (nativeDataEncryption) { - return Iterables.transform(encrypted, this::getSourceFile); - } else { - return Iterables.transform(encrypted, this::decrypt); - } - } - - private InputFile getSourceFile(EncryptedInputFile encryptedFile) { - return encryptedFile.encryptedInputFile(); + return Iterables.transform(encrypted, this::decrypt); } private SecureRandom workerRNG() { @@ -109,7 +95,7 @@ public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } - private class StandardEncryptedOutputFile implements EncryptedOutputFile { + private class StandardEncryptedOutputFile implements NativeEncryptionOutputFile { private final OutputFile plainOutputFile; private final int dataKeyLength; private StandardKeyMetadata lazyKeyMetadata = null; @@ -154,7 +140,7 @@ public OutputFile plainOutputFile() { } } - private static class StandardDecryptedInputFile implements InputFile { + private static class StandardDecryptedInputFile implements NativeEncryptionInputFile { private final EncryptedInputFile encryptedInputFile; private StandardKeyMetadata lazyKeyMetadata = null; private AesGcmInputFile lazyDecryptedInputFile = null; @@ -163,7 +149,13 @@ private StandardDecryptedInputFile(EncryptedInputFile encryptedInputFile) { this.encryptedInputFile = encryptedInputFile; } - private StandardKeyMetadata keyMetadata() { + @Override + public InputFile encryptedInputFile() { + return encryptedInputFile.encryptedInputFile(); + } + + @Override + public StandardKeyMetadata keyMetadata() { if (null == lazyKeyMetadata) { this.lazyKeyMetadata = StandardKeyMetadata.castOrParse(encryptedInputFile.keyMetadata()); } diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java index 964bf2f03a45..08466f75fe21 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardKeyMetadata.java @@ -31,7 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; -public class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord { +class StandardKeyMetadata implements NativeEncryptionKeyMetadata, IndexedRecord { private static final byte V1 = 1; private static final Schema SCHEMA_V1 = new Schema( @@ -73,10 +73,12 @@ static Map supportedAvroSchemaVersions() { return avroSchemaVersions; } + @Override public ByteBuffer encryptionKey() { return encryptionKey; } + @Override public ByteBuffer aadPrefix() { return aadPrefix; } @@ -95,7 +97,7 @@ static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) { return parse(kmBuffer); } - public static StandardKeyMetadata parse(ByteBuffer buffer) { + static StandardKeyMetadata parse(ByteBuffer buffer) { try { return KEY_METADATA_DECODER.decode(buffer); } catch (IOException e) { diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a6f6bf81b8b7..55acd3200894 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -36,7 +36,6 @@ import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; -import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -282,12 +281,6 @@ private CloseableIterable openDeletes(DeleteFile deleteFile, Schema dele builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); } - if (deleteFile.keyMetadata() != null) { - StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(deleteFile.keyMetadata()); - builder.withFileEncryptionKey(keyMetadata.encryptionKey()); - builder.withAADPrefix(keyMetadata.aadPrefix()); - } - return builder.build(); case ORC: diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index f7354f31782e..bf2919f334a8 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -29,7 +29,6 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -127,12 +126,6 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject parquet.reuseContainers(); } - if (task.file().keyMetadata() != null) { - StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(task.file().keyMetadata()); - parquet.withFileEncryptionKey(keyMetadata.encryptionKey()); - parquet.withAADPrefix(keyMetadata.aadPrefix()); - } - return parquet.build(); case ORC: diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index b6f1392d1562..eacef58a8d5d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -35,6 +35,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -99,6 +100,11 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { @@ -142,7 +148,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -164,7 +170,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case AVRO: - return Avro.writeDeletes(outputFile.encryptingOutputFile()) + return Avro.writeDeletes(outputFile) .createWriterFunc(ignore -> new FlinkAvroWriter(lazyEqDeleteFlinkSchema())) .withPartition(partition) .overwrite() @@ -177,7 +183,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(outputFile.encryptingOutputFile()) + return ORC.writeDeletes(outputFile) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema)) .withPartition(partition) @@ -191,7 +197,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + return Parquet.writeDeletes(outputFile) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) .withPartition(partition) @@ -220,7 +226,7 @@ public PositionDeleteWriter newPosDeleteWriter( try { switch (format) { case AVRO: - return Avro.writeDeletes(outputFile.encryptingOutputFile()) + return Avro.writeDeletes(outputFile) .createWriterFunc(ignore -> new FlinkAvroWriter(lazyPosDeleteFlinkSchema())) .withPartition(partition) .overwrite() @@ -234,7 +240,7 @@ public PositionDeleteWriter newPosDeleteWriter( case ORC: RowType orcPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return ORC.writeDeletes(outputFile.encryptingOutputFile()) + return ORC.writeDeletes(outputFile) .createWriterFunc( (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema)) .withPartition(partition) @@ -250,7 +256,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + return Parquet.writeDeletes(outputFile) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType)) .withPartition(partition) diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 72480b212bf7..a95454b8b0ee 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -56,8 +56,8 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -302,7 +302,10 @@ public void close() throws IOException { private CloseableIterable openTask(FileScanTask currentTask, Schema readSchema) { DataFile file = currentTask.file(); - InputFile inputFile = io.newInputFile(file.path().toString()); + InputFile inputFile = + encryptionManager.decrypt( + EncryptedFiles.encryptedInput( + io.newInputFile(file.path().toString()), file.keyMetadata())); CloseableIterable iterable; switch (file.format()) { @@ -416,13 +419,6 @@ private CloseableIterable newParquetIterable( if (nameMapping != null) { parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } - - if (task.file().keyMetadata() != null) { - StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(task.file().keyMetadata()); - parquetReadBuilder.withFileEncryptionKey(keyMetadata.encryptionKey()); - parquetReadBuilder.withAADPrefix(keyMetadata.aadPrefix()); - } - parquetReadBuilder.createReaderFunc( fileSchema -> GenericParquetReaders.buildReader( diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 0b25cdabf499..7d1405bbd45b 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -65,6 +65,8 @@ import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionInputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; @@ -104,8 +106,7 @@ public static WriteBuilder write(OutputFile file) { public static WriteBuilder write(EncryptedOutputFile file) { Preconditions.checkState( - file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, - "Currenty, encryption of data files in ORC format is not supported"); + !(file instanceof NativeEncryptionOutputFile), "Native ORC encryption is not supported"); return new WriteBuilder(file.encryptingOutputFile()); } @@ -392,8 +393,7 @@ public static DataWriteBuilder writeData(OutputFile file) { public static DataWriteBuilder writeData(EncryptedOutputFile file) { Preconditions.checkState( - file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, - "Currenty, encryption of data files in ORC format is not supported"); + !(file instanceof NativeEncryptionOutputFile), "Native ORC encryption is not supported"); return new DataWriteBuilder(file.encryptingOutputFile()); } @@ -496,8 +496,7 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { Preconditions.checkState( - file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY, - "Currenty, encryption of delete files in ORC format is not supported"); + !(file instanceof NativeEncryptionOutputFile), "Native ORC encryption is not supported"); return new DeleteWriteBuilder(file.encryptingOutputFile()); } @@ -679,6 +678,9 @@ public PositionDeleteWriter buildPositionWriter() { } public static ReadBuilder read(InputFile file) { + Preconditions.checkState( + !(file instanceof NativeEncryptionInputFile), "Native ORC encryption is not supported"); + return new ReadBuilder(file); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 5b5ffa858186..f80810acdd7f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -73,7 +73,8 @@ import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; -import org.apache.iceberg.encryption.StandardKeyMetadata; +import org.apache.iceberg.encryption.NativeEncryptionInputFile; +import org.apache.iceberg.encryption.NativeEncryptionOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; @@ -128,11 +129,11 @@ public static WriteBuilder write(OutputFile file) { } public static WriteBuilder write(EncryptedOutputFile file) { - if (file.keyMetadata() instanceof StandardKeyMetadata) { - StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata(); - return write(file.plainOutputFile()) - .withFileEncryptionKey(keyMetadata.encryptionKey()) - .withAADPrefix(keyMetadata.aadPrefix()); + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return write(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); } else { return write(file.encryptingOutputFile()); } @@ -622,11 +623,11 @@ public static DataWriteBuilder writeData(OutputFile file) { } public static DataWriteBuilder writeData(EncryptedOutputFile file) { - if (file.keyMetadata() instanceof StandardKeyMetadata) { - StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata(); - return writeData(file.plainOutputFile()) - .withFileEncryptionKey(keyMetadata.encryptionKey()) - .withAADPrefix(keyMetadata.aadPrefix()); + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return writeData(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); } else { return writeData(file.encryptingOutputFile()); } @@ -740,11 +741,11 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { } public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { - if (file.keyMetadata() instanceof StandardKeyMetadata) { - StandardKeyMetadata keyMetadata = (StandardKeyMetadata) file.keyMetadata(); - return writeDeletes(file.plainOutputFile()) - .withFileEncryptionKey(keyMetadata.encryptionKey()) - .withAADPrefix(keyMetadata.aadPrefix()); + if (file instanceof NativeEncryptionOutputFile) { + NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) file; + return writeDeletes(nativeFile.plainOutputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); } else { return writeDeletes(file.encryptingOutputFile()); } @@ -992,7 +993,14 @@ protected WriteSupport getWriteSupport(Configuration configuration) { } public static ReadBuilder read(InputFile file) { - return new ReadBuilder(file); + if (file instanceof NativeEncryptionInputFile) { + NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file; + return new ReadBuilder(nativeFile.encryptedInputFile()) + .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey()) + .withAADPrefix(nativeFile.keyMetadata().aadPrefix()); + } else { + return new ReadBuilder(file); + } } public static class ReadBuilder { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 135705fe42b3..c05b694a60dc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.nio.ByteBuffer; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -27,15 +26,11 @@ import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.encryption.EncryptedFiles; -import org.apache.iceberg.encryption.EncryptedInputFile; -import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -58,7 +53,6 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( InputFile inputFile, - ByteBuffer keyMetadata, FileFormat format, long start, long length, @@ -67,17 +61,9 @@ protected CloseableIterable newBatchIterable( SparkDeleteFilter deleteFilter) { switch (format) { case PARQUET: - return newParquetIterable( - EncryptedFiles.encryptedInput(inputFile, keyMetadata), - start, - length, - residual, - idToConstant, - deleteFilter); + return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); case ORC: - Preconditions.checkState( - keyMetadata == null, "Encryption currently not supported with ORC format"); return newOrcIterable(inputFile, start, length, residual, idToConstant); default: @@ -87,7 +73,7 @@ protected CloseableIterable newBatchIterable( } private CloseableIterable newParquetIterable( - EncryptedInputFile inputFile, + InputFile inputFile, long start, long length, Expression residual, @@ -96,15 +82,7 @@ private CloseableIterable newParquetIterable( // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - ByteBuffer fileEncryptionKey = null; - ByteBuffer aadPrefix = null; - if (inputFile.keyMetadata() != null && inputFile.keyMetadata().buffer() != null) { - StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(inputFile.keyMetadata().buffer()); - fileEncryptionKey = keyMetadata.encryptionKey(); - aadPrefix = keyMetadata.aadPrefix(); - } - - return Parquet.read(inputFile.encryptedInputFile()) + return Parquet.read(inputFile) .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( @@ -119,8 +97,6 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) - .withFileEncryptionKey(fileEncryptionKey) - .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 68ad06a7b1ee..927084caea1c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.spark.source; -import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -27,9 +26,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.encryption.EncryptedFiles; -import org.apache.iceberg.encryption.EncryptedInputFile; -import org.apache.iceberg.encryption.StandardKeyMetadata; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -54,7 +50,6 @@ abstract class BaseRowReader extends BaseReader newIterable( InputFile file, - ByteBuffer encryptionKeyMetadata, FileFormat format, long start, long length, @@ -63,13 +58,7 @@ protected CloseableIterable newIterable( Map idToConstant) { switch (format) { case PARQUET: - return newParquetIterable( - EncryptedFiles.encryptedInput(file, encryptionKeyMetadata), - start, - length, - residual, - projection, - idToConstant); + return newParquetIterable(file, start, length, residual, projection, idToConstant); case AVRO: return newAvroIterable(file, start, length, projection, idToConstant); @@ -94,21 +83,13 @@ private CloseableIterable newAvroIterable( } private CloseableIterable newParquetIterable( - EncryptedInputFile inputFile, + InputFile file, long start, long length, Expression residual, Schema readSchema, Map idToConstant) { - ByteBuffer fileEncryptionKey = null; - ByteBuffer aadPrefix = null; - if (inputFile.keyMetadata() != null && inputFile.keyMetadata().buffer() != null) { - StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(inputFile.keyMetadata().buffer()); - fileEncryptionKey = keyMetadata.encryptionKey(); - aadPrefix = keyMetadata.aadPrefix(); - } - - return Parquet.read(inputFile.encryptedInputFile()) + return Parquet.read(file) .reuseContainers() .split(start, length) .project(readSchema) @@ -117,8 +98,6 @@ private CloseableIterable newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) - .withFileEncryptionKey(fileEncryptionKey) - .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 4e6e14b76d12..389ad1d5a2d9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -100,7 +100,6 @@ protected CloseableIterator open(FileScanTask task) { return newBatchIterable( inputFile, - task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 307824028e81..572f955884a3 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -133,7 +133,6 @@ private CloseableIterable rows(ContentScanTask task, Sche Preconditions.checkNotNull(location, "Could not find InputFile"); return newIterable( location, - task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index ad034d744dc6..4b847474153c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -92,7 +92,6 @@ protected CloseableIterator open(PositionDeletesScanTask task) { return newIterable( inputFile, - task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 0b11b3332b37..9356f62f3593 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -105,7 +105,6 @@ protected CloseableIterable open( inputFile, "Could not find InputFile associated with FileScanTask"); return newIterable( inputFile, - task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 8557ce331b3d..9df12fc060ae 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -162,8 +162,8 @@ private StructType lazyPosDeleteSparkType() { } @Override - public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - return newAppender(EncryptionUtil.plainAsEncryptedOutput(outputFile), format); + public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(file), fileFormat); } @Override