diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index e81a148627ac..e0d360354d06 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -19,11 +19,21 @@ package org.apache.iceberg; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import org.apache.iceberg.encryption.EncryptionAlgorithm; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EnvelopeConfig; +import org.apache.iceberg.encryption.EnvelopeEncryptionManager; +import org.apache.iceberg.encryption.EnvelopeEncryptionSpec; +import org.apache.iceberg.encryption.EnvelopeKeyManager; +import org.apache.iceberg.encryption.KmsClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.TableEnvelopeKeyManager; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; @@ -315,6 +325,98 @@ private String newTableMetadataFilePath(TableMetadata meta, int newVersion) { return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension)); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static EncryptionManager createEncryptionManager(TableMetadata tableMetadata, + Map extraKmsProperties) { + Map tableProperties = tableMetadata.properties(); + + String keyManagerType = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_MANAGER_TYPE, null); + String tableKeyId = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_TABLE_KEY, null); + + boolean encryptedTable = false; + + if (null == keyManagerType) { + if (null != tableKeyId) { + encryptedTable = true; + // Default: standard envelope encryption + keyManagerType = TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE; + } + } else { + if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_PLAINTEXT)) { + Preconditions.checkArgument(tableKeyId == null, + "Table encryption key set for unencrypted table"); + } else if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_LEGACY)) { + Preconditions.checkArgument(tableKeyId == null, + "Table encryption key set for table encrypted with legacy encryption manager"); + encryptedTable = true; + } else { + Preconditions.checkArgument(tableKeyId != null, + "Table encryption key is not set for encrypted table. " + + "Key manager type: " + keyManagerType); + encryptedTable = true; + } + } + + if (!encryptedTable) { + return new PlaintextEncryptionManager(); + } + + if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_LEGACY)) { + // TODO load custom EncryptionManager. Needed? + } else if (keyManagerType.equals(TableProperties.ENCRYPTION_MANAGER_TYPE_SINGLE_ENVELOPE)) { + Schema tableSchema = tableMetadata.schema(); + + boolean pushdown = PropertyUtil.propertyAsBoolean(tableProperties, + TableProperties.ENCRYPTION_PUSHDOWN_ENABLED, TableProperties.ENCRYPTION_PUSHDOWN_ENABLED_DEFAULT); + + String dataEncryptionAlgorithm = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_DATA_ALGORITHM, TableProperties.ENCRYPTION_DATA_ALGORITHM_DEFAULT); + + EnvelopeConfig dataFileConfig = EnvelopeConfig.builderFor(tableSchema) + .singleWrap(tableKeyId) + .useAlgorithm(EncryptionAlgorithm.valueOf(dataEncryptionAlgorithm)) + .build(); + + String kmsClientImpl = PropertyUtil.propertyAsString(tableProperties, + TableProperties.ENCRYPTION_KMS_CLIENT_IMPL, null); + + // Pass custom kms configuration from table and additional properties + Map kmsProperties = new HashMap<>(); + for (Map.Entry property : tableProperties.entrySet()) { + if (property.getKey().contains("kms.client")) { // TODO + kmsProperties.put(property.getKey(), property.getValue()); + } + } + + for (Map.Entry property : extraKmsProperties.entrySet()) { + if (property.getKey().contains("kms.client")) { // TODO + kmsProperties.put(property.getKey(), property.getValue()); + } + } + + KmsClient kmsClient = TableEnvelopeKeyManager.loadKmsClient(kmsClientImpl, kmsProperties); + + int dataKeyLength = -1; + if (!kmsClient.supportsKeyGeneration()) { + dataKeyLength = PropertyUtil.propertyAsInt(tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + } + + EnvelopeKeyManager keyManager = new TableEnvelopeKeyManager(kmsClient, dataFileConfig, pushdown, + tableSchema, dataKeyLength); + + // TODO add manifest/list encr specs. Post-MVP(?) + EnvelopeEncryptionSpec spec = EnvelopeEncryptionSpec.builderFor(tableSchema) + .addDataFileConfig(dataFileConfig).build(); + + return new EnvelopeEncryptionManager(pushdown, spec, keyManager); + } + + throw new UnsupportedOperationException("Unsupported encryption manager type " + keyManagerType); + } + private static int parseVersion(String metadataLocation) { int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 int versionEnd = metadataLocation.indexOf('-', versionStart); diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index 496f95f10fbe..3f4ac56c09c4 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; @@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; @@ -67,6 +70,7 @@ public class HadoopTableOperations implements TableOperations { private volatile TableMetadata currentMetadata = null; private volatile Integer version = null; private volatile boolean shouldRefresh = true; + private volatile EncryptionManager encryptionManager = null; protected HadoopTableOperations(Path location, FileIO fileIO, Configuration conf) { this.conf = conf; @@ -82,6 +86,23 @@ public TableMetadata current() { return currentMetadata; } + @Override + public EncryptionManager encryption() { + // TODO run by single thread? or synchronize? + if (null == encryptionManager) { + // get KMS client properties from Hadoop config + Map extraKmsProperties = new HashMap<>(); + for (Map.Entry property : conf) { + if (property.getKey().contains("kms.client")) { // TODO + extraKmsProperties.put(property.getKey(), property.getValue()); + } + } + encryptionManager = BaseMetastoreTableOperations.createEncryptionManager(current(), extraKmsProperties); + } + + return encryptionManager; + } + private synchronized Pair versionAndMetadata() { return Pair.of(version, currentMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index b093eab447fe..bf14d6c38a96 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -41,6 +41,11 @@ public interface FileAppenderFactory { */ FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); + // TODO document this, or change the previous function + default FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + return null; + } + /** * Create a new {@link DataWriter}. * diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index e70c3efb936b..150bac69866a 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -34,6 +34,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; @@ -85,6 +86,22 @@ public GenericAppenderFactory setAll(Map properties) { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + return newAppender(outputFile, null, fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) { + NativeFileCryptoParameters nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } + + return newAppender(outputFile.encryptingOutputFile(), nativeEncryption, fileFormat); + } + + private FileAppender newAppender(OutputFile outputFile, + NativeFileCryptoParameters nativeEncryption, + FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (fileFormat) { @@ -104,6 +121,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo .setAll(config) .metricsConfig(metricsConfig) .overwrite() + .encryption(nativeEncryption) .build(); case ORC: @@ -127,7 +145,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo public org.apache.iceberg.io.DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { return new org.apache.iceberg.io.DataWriter<>( - newAppender(file.encryptingOutputFile(), format), format, + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); } @@ -139,6 +157,11 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); + NativeFileCryptoParameters nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); try { switch (format) { @@ -165,6 +188,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, .withSpec(spec) .withKeyMetadata(file.keyMetadata()) .equalityFieldIds(equalityFieldIds) + .encryption(nativeEncryption) .buildEqualityWriter(); default: @@ -180,6 +204,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + NativeFileCryptoParameters nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } try { switch (format) { case AVRO: @@ -203,6 +231,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, .rowSchema(posDeleteRowSchema) .withSpec(spec) .withKeyMetadata(file.keyMetadata()) + .encryption(nativeEncryption) .buildPositionWriter(); default: diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index e39d470ec7ed..4c2e9a46d379 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -35,6 +35,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.data.FlinkAvroWriter; import org.apache.iceberg.flink.data.FlinkOrcWriter; @@ -94,6 +95,22 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + return newAppender(outputFile, null, format); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile outputFile, FileFormat format) { + NativeFileCryptoParameters nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } + + return newAppender(outputFile.encryptingOutputFile(), nativeEncryption, format); + } + + private FileAppender newAppender(OutputFile outputFile, + NativeFileCryptoParameters nativeEncryption, + FileFormat format) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); try { switch (format) { @@ -122,6 +139,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .metricsConfig(metricsConfig) .schema(schema) .overwrite() + .encryption(nativeEncryption) .build(); default: @@ -135,7 +153,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma @Override public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), format, + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); } @@ -148,6 +166,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu "Equality delete row schema shouldn't be null when creating equality-delete writer"); MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + NativeFileCryptoParameters nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } try { switch (format) { case AVRO: @@ -173,6 +195,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) .equalityFieldIds(equalityFieldIds) + .encryption(nativeEncryption) .buildEqualityWriter(); default: @@ -188,6 +211,10 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile outpu public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + NativeFileCryptoParameters nativeEncryption = null; + if (outputFile.useNativeEncryption()) { + nativeEncryption = outputFile.nativeEncryptionParameters(); + } try { switch (format) { case AVRO: @@ -213,6 +240,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile outp .withSpec(spec) .withKeyMetadata(outputFile.keyMetadata()) .transformPaths(path -> StringData.fromString(path.toString())) + .encryption(nativeEncryption) .buildPositionWriter(); default: diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 29bb4edea174..d1538778dc33 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -32,6 +32,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -149,6 +150,22 @@ private StructType lazyPosDeleteSparkType() { @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + return newAppender(file, null, fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { + NativeFileCryptoParameters nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } + + return newAppender(file.encryptingOutputFile(), nativeEncryption, fileFormat); + } + + private FileAppender newAppender(OutputFile file, + NativeFileCryptoParameters nativeEncryption, + FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { @@ -159,6 +176,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor .metricsConfig(metricsConfig) .schema(writeSchema) .overwrite() + .encryption(nativeEncryption) .build(); case AVRO: @@ -188,7 +206,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor @Override public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { - return new DataWriter<>(newAppender(file.encryptingOutputFile(), format), format, + return new DataWriter<>(newAppender(file, format), format, file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); } @@ -200,6 +218,11 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f Preconditions.checkNotNull(eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); + NativeFileCryptoParameters nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } + try { switch (format) { case PARQUET: @@ -211,6 +234,7 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f .withPartition(partition) .equalityFieldIds(equalityFieldIds) .withKeyMetadata(file.keyMetadata()) + .encryption(nativeEncryption) .buildEqualityWriter(); case AVRO: @@ -236,6 +260,11 @@ public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile f @Override public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + NativeFileCryptoParameters nativeEncryption = null; + if (file.useNativeEncryption()) { + nativeEncryption = file.nativeEncryptionParameters(); + } + try { switch (format) { case PARQUET: @@ -249,6 +278,7 @@ public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile .withPartition(partition) .withKeyMetadata(file.keyMetadata()) .transformPaths(path -> UTF8String.fromString(path.toString())) + .encryption(nativeEncryption) .buildPositionWriter(); case AVRO: