From efb20c6583361c2168323450a90ff577a7b857ac Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Wed, 26 May 2021 16:34:43 +0300 Subject: [PATCH 1/2] call native encryption API --- .../iceberg/io/FileAppenderFactory.java | 5 +++ .../iceberg/data/GenericAppenderFactory.java | 31 ++++++++++++++++++- .../flink/sink/FlinkAppenderFactory.java | 30 +++++++++++++++++- .../spark/source/SparkAppenderFactory.java | 30 +++++++++++++++++- 4 files changed, 93 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index b093eab447fe..bf14d6c38a96 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -41,6 +41,11 @@ public interface FileAppenderFactory { */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + // TODO document this, or change the previous function + default FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + return null; + } + /** * Create a new {@link DataWriter}. * diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e70c3efb936b..43f164c00ebf 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -34,6 +34,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeFileEncryptParams; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -85,6 +86,22 @@ public GenericAppenderFactory setAll(Map properties) { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + return newAppender(outputFile, null, fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + NativeFileEncryptParams nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } + + return newAppender(outputFile.encryptingOutputFile(), nativeEncryption, fileFormat); + } + + private FileAppender newAppender(OutputFile outputFile, + NativeFileEncryptParams nativeEncryption, + FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (fileFormat) { @@ -104,6 +121,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .setAll(config) .metricsConfig(metricsConfig) .overwrite() + .encryption(nativeEncryption) .build(); case ORC: @@ -127,7 +145,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { return new org.apache.iceberg.io.DataWriter<>( - newAppender(file.encryptingOutputFile(), format), format, + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); } @@ -139,6 +157,11 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); + NativeFileEncryptParams nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (format) { @@ -165,6 +188,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, .withSpec(spec) .withKeyMetadata(file.keyMetadata()) .equalityFieldIds(equalityFieldIds) + .encryption(nativeEncryption) .buildEqualityWriter(); default: @@ -180,6 +204,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + NativeFileEncryptParams nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } try { switch (format) { case AVRO: @@ -203,6 +231,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, .rowSchema(posDeleteRowSchema) .withSpec(spec) .withKeyMetadata(file.keyMetadata()) + .encryption(nativeEncryption) .buildPositionWriter(); default: diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index e39d470ec7ed..4cb0f073d0a7 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -35,6 +35,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeFileEncryptParams; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -94,6 +95,22 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(outputFile, null, format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat format) { + NativeFileEncryptParams nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } + + return newAppender(outputFile.encryptingOutputFile(), nativeEncryption, format); + } + + private FileAppender newAppender(OutputFile outputFile, + NativeFileEncryptParams nativeEncryption, + FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); try { switch (format) { @@ -122,6 +139,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .metricsConfig(metricsConfig) .schema(schema) .overwrite() + .encryption(nativeEncryption) .build(); default: @@ -135,7 +153,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma @Override public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), format, + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); } @@ -148,6 +166,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu "Equality delete row schema shouldn't be null when creating equality-delete writer"); MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + NativeFileEncryptParams nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } try { switch (format) { case AVRO: @@ -173,6 +195,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) .equalityFieldIds(equalityFieldIds) + .encryption(nativeEncryption) .buildEqualityWriter(); default: @@ -188,6 +211,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + NativeFileEncryptParams nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } try { switch (format) { case AVRO: @@ -213,6 +240,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) .transformPaths(path -> StringData.fromString(path.toString())) + .encryption(nativeEncryption) .buildPositionWriter(); default: diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 29bb4edea174..6874585a301d 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeFileEncryptParams; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -149,6 +150,22 @@ private StructType lazyPosDeleteSparkType() { @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + return newAppender(file, null, fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { + NativeFileEncryptParams nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } + + return newAppender(file.encryptingOutputFile(), nativeEncryption, fileFormat); + } + + private FileAppender newAppender(OutputFile file, + NativeFileEncryptParams nativeEncryption, + FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { @@ -159,6 +176,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor .metricsConfig(metricsConfig) .schema(writeSchema) .overwrite() + .encryption(nativeEncryption) .build(); case AVRO: @@ -188,7 +206,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor @Override public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { - return new DataWriter<>(newAppender(file.encryptingOutputFile(), format), format, + return new DataWriter<>(newAppender(file, format), format, file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); } @@ -199,6 +217,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f "Equality field ids shouldn't be null or empty when creating equality-delete writer"); Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); + NativeFileEncryptParams nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } try { switch (format) { @@ -211,6 +233,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f .withPartition(partition) .equalityFieldIds(equalityFieldIds) .withKeyMetadata(file.keyMetadata()) + .encryption(nativeEncryption) .buildEqualityWriter(); case AVRO: @@ -236,6 +259,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f @Override public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + NativeFileEncryptParams nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } try { switch (format) { case PARQUET: @@ -249,6 +276,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .withPartition(partition) .withKeyMetadata(file.keyMetadata()) .transformPaths(path -> UTF8String.fromString(path.toString())) + .encryption(nativeEncryption) .buildPositionWriter(); case AVRO: From 626924e7c15e5f6b0c401efaf8c5bc1cc4b3f0a8 Mon Sep 17 00:00:00 2001 From: Gidon Gershinsky Date: Thu, 4 Nov 2021 16:10:45 +0200 Subject: [PATCH 2/2] updates after review round 1 --- .../iceberg/BaseMetastoreTableOperations.java | 102 ++++++++++++++++++ .../iceberg/hadoop/HadoopTableOperations.java | 21 ++++ .../iceberg/data/GenericAppenderFactory.java | 10 +- .../flink/sink/FlinkAppenderFactory.java | 10 +- .../spark/source/SparkAppenderFactory.java | 12 ++- 5 files changed, 140 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index e81a148627ac..e0d360354d06 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -19,11 +19,21 @@ package org.apache.iceberg; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import org.apache.iceberg.encryption.EncryptionAlgorithm; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EnvelopeConfig; +import org.apache.iceberg.encryption.EnvelopeEncryptionManager; +import org.apache.iceberg.encryption.EnvelopeEncryptionSpec; +import org.apache.iceberg.encryption.EnvelopeKeyManager; +import org.apache.iceberg.encryption.KmsClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.TableEnvelopeKeyManager; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; @@ -315,6 +325,98 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension)); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static EncryptionManager createEncryptionManager(TableMetadata tableMetadata, + Map extraKmsProperties) { + Map tableProperties = tableMetadata.properties(); + + String keyManagerType = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_MANAGER_TYPE, null); + String tableKeyId = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_TABLE_KEY, null); + + boolean encryptedTable = false; + + if (null == keyManagerType) { + if (null != tableKeyId) { + encryptedTable = true; + // Default: standard envelope encryption + keyManagerType = TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE; + } + } else { + if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) { + Preconditions.checkArgument(tableKeyId == null, + "Table encryption key set for unencrypted table"); + } else if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_LEGACY)) { + Preconditions.checkArgument(tableKeyId == null, + "Table encryption key set for table encrypted with legacy encryption manager"); + encryptedTable = true; + } else { + Preconditions.checkArgument(tableKeyId != null, + "Table encryption key is not set for encrypted table. " + + "Key manager type: " + keyManagerType); + encryptedTable = true; + } + } + + if (!encryptedTable) { + return new PlaintextEncryptionManager(); + } + + if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_LEGACY)) { + // TODO load custom EncryptionManager. Needed? + } else if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)) { + Schema tableSchema = tableMetadata.schema(); + + boolean pushdown = PropertyUtil.propertyAsBoolean(tableProperties, + TableProperties.ENCRYPTION_PUSHDOWN_ENABLED, TableProperties.ENCRYPTION_PUSHDOWN_ENABLED_DEFAULT); + + String dataEncryptionAlgorithm = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_DATA_ALGORITHM, TableProperties.ENCRYPTION_DATA_ALGORITHM_DEFAULT); + + EnvelopeConfig dataFileConfig = EnvelopeConfig.builderFor(tableSchema) + .singleWrap(tableKeyId) + .useAlgorithm(EncryptionAlgorithm.valueOf(dataEncryptionAlgorithm)) + .build(); + + String kmsClientImpl = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, null); + + // Pass custom kms configuration from table and additional properties + Map kmsProperties = new HashMap<>(); + for (Map.Entry property : tableProperties.entrySet()) { + if (property.getKey().contains("kms.client")) { // TODO + kmsProperties.put(property.getKey(), property.getValue()); + } + } + + for (Map.Entry property : extraKmsProperties.entrySet()) { + if (property.getKey().contains("kms.client")) { // TODO + kmsProperties.put(property.getKey(), property.getValue()); + } + } + + KmsClient kmsClient = TableEnvelopeKeyManager.loadKmsClient(kmsClientImpl, kmsProperties); + + int dataKeyLength = -1; + if (!kmsClient.supportsKeyGeneration()) { + dataKeyLength = PropertyUtil.propertyAsInt(tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + } + + EnvelopeKeyManager keyManager = new TableEnvelopeKeyManager(kmsClient, dataFileConfig, pushdown, + tableSchema, dataKeyLength); + + // TODO add manifest/list encr specs. Post-MVP(?) + EnvelopeEncryptionSpec spec = EnvelopeEncryptionSpec.builderFor(tableSchema) + .addDataFileConfig(dataFileConfig).build(); + + return new EnvelopeEncryptionManager(pushdown, spec, keyManager); + } + + throw new UnsupportedOperationException("Unsupported encryption manager type " + keyManagerType); + } + private static int parseVersion(String metadataLocation) { int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 int versionEnd = metadataLocation.indexOf('-', versionStart); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 496f95f10fbe..3f4ac56c09c4 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; @@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; @@ -67,6 +70,7 @@ public class HadoopTableOperations implements TableOperations { private volatile TableMetadata currentMetadata = null; private volatile Integer version = null; private volatile boolean shouldRefresh = true; + private volatile EncryptionManager encryptionManager = null; protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf) { this.conf = conf; @@ -82,6 +86,23 @@ public TableMetadata current() { return currentMetadata; } + @Override + public EncryptionManager encryption() { + // TODO run by single thread? or synchronize? + if (null == encryptionManager) { + // get KMS client properties from Hadoop config + Map extraKmsProperties = new HashMap<>(); + for (Map.Entry property : conf) { + if (property.getKey().contains("kms.client")) { // TODO + extraKmsProperties.put(property.getKey(), property.getValue()); + } + } + encryptionManager = BaseMetastoreTableOperations.createEncryptionManager(current(), extraKmsProperties); + } + + return encryptionManager; + } + private synchronized Pair versionAndMetadata() { return Pair.of(version, currentMetadata); } diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 43f164c00ebf..150bac69866a 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -34,7 +34,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.encryption.NativeFileEncryptParams; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -91,7 +91,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo @Override public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (outputFile.useNativeEncryption()) { nativeEncryption = outputFile.nativeEncryptionParameters(); } @@ -100,7 +100,7 @@ public FileAppender newAppender(EncryptedOutputFile outputFile, FileForm } private FileAppender newAppender(OutputFile outputFile, - NativeFileEncryptParams nativeEncryption, + NativeFileCryptoParameters nativeEncryption, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { @@ -157,7 +157,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (file.useNativeEncryption()) { nativeEncryption = file.nativeEncryptionParameters(); } @@ -204,7 +204,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (file.useNativeEncryption()) { nativeEncryption = file.nativeEncryptionParameters(); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 4cb0f073d0a7..4c2e9a46d379 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -35,7 +35,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.encryption.NativeFileEncryptParams; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -100,7 +100,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma @Override public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat format) { - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (outputFile.useNativeEncryption()) { nativeEncryption = outputFile.nativeEncryptionParameters(); } @@ -109,7 +109,7 @@ public FileAppender newAppender(EncryptedOutputFile outputFile, FileFor } private FileAppender newAppender(OutputFile outputFile, - NativeFileEncryptParams nativeEncryption, + NativeFileCryptoParameters nativeEncryption, FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); try { @@ -166,7 +166,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu "Equality delete row schema shouldn't be null when creating equality-delete writer"); MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (outputFile.useNativeEncryption()) { nativeEncryption = outputFile.nativeEncryptionParameters(); } @@ -211,7 +211,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (outputFile.useNativeEncryption()) { nativeEncryption = outputFile.nativeEncryptionParameters(); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 6874585a301d..d1538778dc33 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -32,7 +32,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.encryption.NativeFileEncryptParams; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -155,7 +155,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor @Override public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (file.useNativeEncryption()) { nativeEncryption = file.nativeEncryptionParameters(); } @@ -164,7 +164,7 @@ public FileAppender newAppender(EncryptedOutputFile file, FileForma } private FileAppender newAppender(OutputFile file, - NativeFileEncryptParams nativeEncryption, + NativeFileCryptoParameters nativeEncryption, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { @@ -217,7 +217,8 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f "Equality field ids shouldn't be null or empty when creating equality-delete writer"); Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - NativeFileEncryptParams nativeEncryption = null; + + NativeFileCryptoParameters nativeEncryption = null; if (file.useNativeEncryption()) { nativeEncryption = file.nativeEncryptionParameters(); } @@ -259,10 +260,11 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f @Override public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { - NativeFileEncryptParams nativeEncryption = null; + NativeFileCryptoParameters nativeEncryption = null; if (file.useNativeEncryption()) { nativeEncryption = file.nativeEncryptionParameters(); } + try { switch (format) { case PARQUET: