diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java index 1686342c776d..300c88d18862 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptedOutputFile.java @@ -37,4 +37,9 @@ public interface EncryptedOutputFile { * #encryptingOutputFile()}. */ EncryptionKeyMetadata keyMetadata(); + + /** Underlying output file for native encryption. */ + default OutputFile plainOutputFile() { + throw new UnsupportedOperationException("Not implemented"); + } } diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java index 1ce1c337a809..f620c46e563e 100644 --- a/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java +++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptionKeyMetadata.java @@ -39,6 +39,11 @@ public ByteBuffer buffer() { public EncryptionKeyMetadata copy() { return this; } + + @Override + public ByteBuffer encryptionKey() { + return null; + } }; static EncryptionKeyMetadata empty() { @@ -49,4 +54,12 @@ static EncryptionKeyMetadata empty() { ByteBuffer buffer(); EncryptionKeyMetadata copy(); + + default ByteBuffer encryptionKey() { + throw new UnsupportedOperationException("Not implemented"); + } + + default ByteBuffer aadPrefix() { + throw new UnsupportedOperationException("Not implemented"); + } } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java index c0fc41ca1385..912104a5305d 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptedFiles.java @@ -57,5 +57,9 @@ public static EncryptedOutputFile encryptedOutput( encryptedOutputFile, BaseEncryptionKeyMetadata.fromByteArray(keyMetadata)); } + public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { + return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty()); + } + private EncryptedFiles() {} } diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java new file mode 100644 index 000000000000..ad1deecb8da1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; + +public class EncryptionUtil { + + private EncryptionUtil() {} + + public static KeyManagementClient createKmsClient(Map catalogProperties) { + String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); + String kmsImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_IMPL); + + Preconditions.checkArgument( + kmsType == null || kmsImpl == null, + "Cannot set both KMS type (%s) and KMS impl (%s)", + kmsType, + kmsImpl); + + // TODO: Add KMS implementations + Preconditions.checkArgument(kmsType == null, "Unsupported KMS type: %s", kmsType); + + KeyManagementClient kmsClient; + DynConstructors.Ctor ctor; + try { + ctor = DynConstructors.builder(KeyManagementClient.class).impl(kmsImpl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize KeyManagementClient, missing no-arg constructor for class %s", + kmsImpl), + e); + } + + try { + kmsClient = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize kms client, %s does not implement KeyManagementClient interface", + kmsImpl), + e); + } + + kmsClient.initialize(catalogProperties); + + return kmsClient; + } + + public static EncryptionManager createEncryptionManager( + Map tableProperties, KeyManagementClient kmsClient) { + Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); + String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + + if (null == tableKeyId) { + // Unencrypted table + return PlaintextEncryptionManager.instance(); + } + + String fileFormat = + PropertyUtil.propertyAsString( + tableProperties, + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + + if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) { + throw new UnsupportedOperationException( + "Iceberg encryption currently supports only parquet format for data files"); + } + + int dataKeyLength = + PropertyUtil.propertyAsInt( + tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + + Preconditions.checkState( + dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, + "Invalid data key length: %s (must be 16, 24, or 32)", + dataKeyLength); + + return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + } +} diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java deleted file mode 100644 index 0d7ec43f6ebc..000000000000 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.encryption; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.util.Map; -import org.apache.avro.generic.IndexedRecord; -import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.types.Types; - -class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { - private static final byte V1 = 1; - private static final Schema SCHEMA_V1 = - new Schema( - required(0, "encryption_key", Types.BinaryType.get()), - optional(1, "aad_prefix", Types.BinaryType.get())); - private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = - AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName()); - - private static final Map schemaVersions = ImmutableMap.of(V1, SCHEMA_V1); - private static final Map avroSchemaVersions = - ImmutableMap.of(V1, AVRO_SCHEMA_V1); - - private static final KeyMetadataEncoder KEY_METADATA_ENCODER = new KeyMetadataEncoder(V1); - private static final KeyMetadataDecoder KEY_METADATA_DECODER = new KeyMetadataDecoder(V1); - - private ByteBuffer encryptionKey; - private ByteBuffer aadPrefix; - private org.apache.avro.Schema avroSchema; - - /** Used by Avro reflection to instantiate this class * */ - KeyMetadata() {} - - KeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { - this.encryptionKey = encryptionKey; - this.aadPrefix = aadPrefix; - this.avroSchema = AVRO_SCHEMA_V1; - } - - static Map supportedSchemaVersions() { - return schemaVersions; - } - - static Map supportedAvroSchemaVersions() { - return avroSchemaVersions; - } - - ByteBuffer encryptionKey() { - return encryptionKey; - } - - ByteBuffer aadPrefix() { - return aadPrefix; - } - - static KeyMetadata parse(ByteBuffer buffer) { - try { - return KEY_METADATA_DECODER.decode(buffer); - } catch (IOException e) { - throw new UncheckedIOException("Failed to parse envelope encryption metadata", e); - } - } - - @Override - public ByteBuffer buffer() { - try { - return KEY_METADATA_ENCODER.encode(this); - } catch (IOException e) { - throw new UncheckedIOException("Failed to serialize envelope key metadata", e); - } - } - - @Override - public EncryptionKeyMetadata copy() { - KeyMetadata metadata = new KeyMetadata(encryptionKey(), aadPrefix()); - return metadata; - } - - @Override - public void put(int i, Object v) { - switch (i) { - case 0: - this.encryptionKey = (ByteBuffer) v; - return; - case 1: - this.aadPrefix = (ByteBuffer) v; - return; - default: - // ignore the object, it must be from a newer version of the format - } - } - - @Override - public Object get(int i) { - switch (i) { - case 0: - return encryptionKey; - case 1: - return aadPrefix; - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); - } - } - - @Override - public org.apache.avro.Schema getSchema() { - return avroSchema; - } -} diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 59b0b4b3bf6a..1b320cae64e0 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -40,6 +40,17 @@ public interface FileAppenderFactory { */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + /** + * Create a new {@link FileAppender}. + * + * @param outputFile an EncryptedOutputFile used to create an output stream. + * @param fileFormat File format. + * @return a newly created {@link FileAppender} + */ + default FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + return newAppender(outputFile.encryptingOutputFile(), fileFormat); + } + /** * Create a new {@link DataWriter}. * diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java index 976b98b0a9fe..beede9fb5fe9 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java @@ -118,7 +118,7 @@ public DataWriter newDataWriter( case PARQUET: Parquet.DataWriteBuilder parquetBuilder = - Parquet.writeData(outputFile) + Parquet.writeData(file) .schema(dataSchema) .setAll(properties) .metricsConfig(metricsConfig) @@ -186,7 +186,7 @@ public EqualityDeleteWriter newEqualityDeleteWriter( case PARQUET: Parquet.DeleteWriteBuilder parquetBuilder = - Parquet.writeDeletes(outputFile) + Parquet.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(equalityDeleteRowSchema) @@ -254,7 +254,7 @@ public PositionDeleteWriter newPositionDeleteWriter( case PARQUET: Parquet.DeleteWriteBuilder parquetBuilder = - Parquet.writeDeletes(outputFile) + Parquet.writeDeletes(file) .setAll(properties) .metricsConfig(metricsConfig) .rowSchema(positionDeleteRowSchema) diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a7979fd2ed3e..8f5ff6ecf127 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -36,6 +36,8 @@ import org.apache.iceberg.deletes.DeleteCounter; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -293,6 +295,13 @@ private CloseableIterable openDeletes(DeleteFile deleteFile, Schema dele builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath)); } + if (deleteFile.keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(deleteFile.keyMetadata()); + builder.withFileEncryptionKey(keyMetadata.encryptionKey()); + builder.withAADPrefix(keyMetadata.aadPrefix()); + } + return builder.build(); case ORC: diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..ae0c5405fe03 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; @@ -84,7 +85,14 @@ public GenericAppenderFactory setAll(Map properties) { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + return newAppender(EncryptedFiles.plainAsEncryptedOutput(outputFile), fileFormat); + } + + @Override + public FileAppender newAppender( + EncryptedOutputFile encryptedOutputFile, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + OutputFile outputFile = encryptedOutputFile.encryptingOutputFile(); try { switch (fileFormat) { case AVRO: @@ -97,7 +105,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .build(); case PARQUET: - return Parquet.write(outputFile) + return Parquet.write(encryptedOutputFile) .schema(schema) .createWriterFunc(GenericParquetWriter::buildWriter) .setAll(config) @@ -127,7 +135,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo public org.apache.iceberg.io.DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new org.apache.iceberg.io.DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -146,10 +154,12 @@ public EqualityDeleteWriter newEqDeleteWriter( "Equality delete row schema shouldn't be null when creating equality-delete writer"); MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + OutputFile outputFile = file.encryptingOutputFile(); + try { switch (format) { case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(outputFile) .createWriterFunc(DataWriter::create) .withPartition(partition) .overwrite() @@ -161,7 +171,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(outputFile) .createWriterFunc(GenericOrcWriter::buildWriter) .withPartition(partition) .overwrite() @@ -174,7 +184,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc(GenericParquetWriter::buildWriter) .withPartition(partition) .overwrite() @@ -199,10 +209,12 @@ public EqualityDeleteWriter newEqDeleteWriter( public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + OutputFile outputFile = file.encryptingOutputFile(); + try { switch (format) { case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(outputFile) .createWriterFunc(DataWriter::create) .withPartition(partition) .overwrite() @@ -213,7 +225,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(outputFile) .createWriterFunc(GenericOrcWriter::buildWriter) .withPartition(partition) .overwrite() @@ -224,7 +236,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc(GenericParquetWriter::buildWriter) .withPartition(partition) .overwrite() diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index bf2919f334a8..9f69bde1f897 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -29,6 +29,8 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -126,6 +128,13 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject parquet.reuseContainers(); } + if (task.file().keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(task.file().keyMetadata()); + parquet.withFileEncryptionKey(keyMetadata.encryptionKey()); + parquet.withAADPrefix(keyMetadata.aadPrefix()); + } + return parquet.build(); case ORC: diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index b6f1392d1562..0456324ce833 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -34,6 +34,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; @@ -99,7 +100,15 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(EncryptedFiles.plainAsEncryptedOutput(outputFile), format); + } + + @Override + public FileAppender newAppender( + EncryptedOutputFile encryptedOutputFile, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.forTable(table); + OutputFile outputFile = encryptedOutputFile.encryptingOutputFile(); + try { switch (format) { case AVRO: @@ -122,7 +131,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .build(); case PARQUET: - return Parquet.write(outputFile) + return Parquet.write(encryptedOutputFile) .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(flinkSchema, msgType)) .setAll(props) .metricsConfig(metricsConfig) @@ -142,7 +151,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -191,7 +200,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case PARQUET: - return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + return Parquet.writeDeletes(outputFile) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType)) .withPartition(partition) @@ -250,7 +259,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(outputFile.encryptingOutputFile()) + return Parquet.writeDeletes(outputFile) .createWriterFunc( msgType -> FlinkParquetWriters.buildWriter(flinkPosDeleteSchema, msgType)) .withPartition(partition) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 88364f4e87b1..20ce80b62bfc 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -29,6 +29,8 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -182,6 +184,13 @@ private CloseableIterable newParquetIterable( builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + if (task.file().keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(task.file().keyMetadata()); + builder.withFileEncryptionKey(keyMetadata.encryptionKey()); + builder.withAADPrefix(keyMetadata.aadPrefix()); + } + return builder.build(); } diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index a95454b8b0ee..b2bfa1399ca7 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -56,8 +56,9 @@ import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -302,10 +303,7 @@ public void close() throws IOException { private CloseableIterable openTask(FileScanTask currentTask, Schema readSchema) { DataFile file = currentTask.file(); - InputFile inputFile = - encryptionManager.decrypt( - EncryptedFiles.encryptedInput( - io.newInputFile(file.path().toString()), file.keyMetadata())); + InputFile inputFile = io.newInputFile(file.path().toString()); CloseableIterable iterable; switch (file.format()) { @@ -419,6 +417,14 @@ private CloseableIterable newParquetIterable( if (nameMapping != null) { parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } + + if (task.file().keyMetadata() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(task.file().keyMetadata()); + parquetReadBuilder.withFileEncryptionKey(keyMetadata.encryptionKey()); + parquetReadBuilder.withAADPrefix(keyMetadata.aadPrefix()); + } + parquetReadBuilder.createReaderFunc( fileSchema -> GenericParquetReaders.buildReader( diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index d240c84b9e4d..0d5b5694f971 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -71,7 +71,9 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hadoop.HadoopInputFile; @@ -125,6 +127,16 @@ public static WriteBuilder write(OutputFile file) { return new WriteBuilder(file); } + public static WriteBuilder write(EncryptedOutputFile file) { + if (EncryptionUtil.useNativeEncryption(file.keyMetadata())) { + return write(file.rawOutputFile()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()); + } else { + return write(file.encryptingOutputFile()); + } + } + public static class WriteBuilder { private final OutputFile file; private final Configuration conf; @@ -608,6 +620,16 @@ public static DataWriteBuilder writeData(OutputFile file) { return new DataWriteBuilder(file); } + public static DataWriteBuilder writeData(EncryptedOutputFile file) { + if (EncryptionUtil.useNativeEncryption(file.keyMetadata())) { + return writeData(file.rawOutputFile()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()); + } else { + return writeData(file.encryptingOutputFile()); + } + } + public static class DataWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; @@ -715,6 +737,16 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { return new DeleteWriteBuilder(file); } + public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) { + if (EncryptionUtil.useNativeEncryption(file.keyMetadata())) { + return writeDeletes(file.rawOutputFile()) + .withFileEncryptionKey(file.keyMetadata().encryptionKey()) + .withAADPrefix(file.keyMetadata().aadPrefix()); + } else { + return writeDeletes(file.encryptingOutputFile()); + } + } + public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index c05b694a60dc..0c51e9c4a9a6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.nio.ByteBuffer; import java.util.Map; import java.util.Set; import org.apache.iceberg.FileFormat; @@ -26,11 +27,16 @@ import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; @@ -53,6 +59,7 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( InputFile inputFile, + ByteBuffer keyMetadata, FileFormat format, long start, long length, @@ -61,9 +68,17 @@ protected CloseableIterable newBatchIterable( SparkDeleteFilter deleteFilter) { switch (format) { case PARQUET: - return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); + return newParquetIterable( + EncryptedFiles.encryptedInput(inputFile, keyMetadata), + start, + length, + residual, + idToConstant, + deleteFilter); case ORC: + Preconditions.checkState( + keyMetadata == null, "Encryption currently not supported with ORC format"); return newOrcIterable(inputFile, start, length, residual, idToConstant); default: @@ -73,7 +88,7 @@ protected CloseableIterable newBatchIterable( } private CloseableIterable newParquetIterable( - InputFile inputFile, + EncryptedInputFile inputFile, long start, long length, Expression residual, @@ -82,7 +97,16 @@ private CloseableIterable newParquetIterable( // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - return Parquet.read(inputFile) + ByteBuffer fileEncryptionKey = null; + ByteBuffer aadPrefix = null; + if (inputFile.keyMetadata() != null && inputFile.keyMetadata().buffer() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(inputFile.keyMetadata().buffer()); + fileEncryptionKey = keyMetadata.encryptionKey(); + aadPrefix = keyMetadata.aadPrefix(); + } + + return Parquet.read(inputFile.encryptedInputFile()) .project(requiredSchema) .split(start, length) .createBatchedReaderFunc( @@ -97,6 +121,8 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .withFileEncryptionKey(fileEncryptionKey) + .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 927084caea1c..40f62d834f4c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -26,6 +27,10 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -50,6 +55,7 @@ abstract class BaseRowReader extends BaseReader newIterable( InputFile file, + ByteBuffer encryptionKeyMetadata, FileFormat format, long start, long length, @@ -58,7 +64,13 @@ protected CloseableIterable newIterable( Map idToConstant) { switch (format) { case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); + return newParquetIterable( + EncryptedFiles.encryptedInput(file, encryptionKeyMetadata), + start, + length, + residual, + projection, + idToConstant); case AVRO: return newAvroIterable(file, start, length, projection, idToConstant); @@ -83,13 +95,22 @@ private CloseableIterable newAvroIterable( } private CloseableIterable newParquetIterable( - InputFile file, + EncryptedInputFile inputFile, long start, long length, Expression residual, Schema readSchema, Map idToConstant) { - return Parquet.read(file) + ByteBuffer fileEncryptionKey = null; + ByteBuffer aadPrefix = null; + if (inputFile.keyMetadata() != null && inputFile.keyMetadata().buffer() != null) { + EncryptionKeyMetadata keyMetadata = + EncryptionUtil.parseKeyMetadata(inputFile.keyMetadata().buffer()); + fileEncryptionKey = keyMetadata.encryptionKey(); + aadPrefix = keyMetadata.aadPrefix(); + } + + return Parquet.read(inputFile.encryptedInputFile()) .reuseContainers() .split(start, length) .project(readSchema) @@ -98,6 +119,8 @@ private CloseableIterable newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) + .withFileEncryptionKey(fileEncryptionKey) + .withAADPrefix(aadPrefix) .build(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 389ad1d5a2d9..4e6e14b76d12 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -100,6 +100,7 @@ protected CloseableIterator open(FileScanTask task) { return newBatchIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 572f955884a3..307824028e81 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -133,6 +133,7 @@ private CloseableIterable rows(ContentScanTask task, Sche Preconditions.checkNotNull(location, "Could not find InputFile"); return newIterable( location, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 4b847474153c..ad034d744dc6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -92,6 +92,7 @@ protected CloseableIterator open(PositionDeletesScanTask task) { return newIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 9356f62f3593..0b11b3332b37 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -105,6 +105,7 @@ protected CloseableIterable open( inputFile, "Could not find InputFile associated with FileScanTask"); return newIterable( inputFile, + task.file().keyMetadata(), task.file().format(), task.start(), task.length(), diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 6372edde0782..7549b7380563 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; @@ -161,7 +162,12 @@ private StructType lazyPosDeleteSparkType() { } @Override - public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(EncryptedFiles.plainAsEncryptedOutput(outputFile), format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { @@ -175,7 +181,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor .build(); case AVRO: - return Avro.write(file) + return Avro.write(file.encryptingOutputFile()) .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema)) .setAll(properties) .schema(writeSchema) @@ -183,7 +189,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor .build(); case ORC: - return ORC.write(file) + return ORC.write(file.encryptingOutputFile()) .createWriterFunc(SparkOrcWriter::new) .setAll(properties) .metricsConfig(metricsConfig) @@ -203,7 +209,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -224,7 +230,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType)) .overwrite() @@ -274,7 +280,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: StructType sparkPosDeleteSchema = SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType)) .overwrite()