diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 9fa52d52ea5d..bda983a6c170 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -214,7 +214,7 @@ protected void refreshFromMetadataLocation( this.shouldRefresh = false; } - private String metadataFileLocation(TableMetadata metadata, String filename) { + protected String metadataFileLocation(TableMetadata metadata, String filename) { String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION); if (metadataLocation != null) { diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index 1b35cb82f03e..51b73cc43d8f 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -75,7 +75,7 @@ public static KeyManagementClient createKmsClient(Map catalogPro return kmsClient; } - static EncryptionManager createEncryptionManager( + public static EncryptionManager createEncryptionManager( List keys, Map tableProperties, KeyManagementClient kmsClient) { Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null"); String tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); @@ -96,7 +96,7 @@ static EncryptionManager createEncryptionManager( "Invalid data key length: %s (must be 16, 24, or 32)", dataKeyLength); - return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient); + return new StandardEncryptionManager(keys, tableKeyId, dataKeyLength, kmsClient); } public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) { @@ -128,6 +128,14 @@ public static ByteBuffer decryptManifestListKeyMetadata( return ByteBuffer.wrap(decryptedKeyMetadata); } + public static Map encryptionKeys(EncryptionManager em) { + Preconditions.checkState( + em instanceof StandardEncryptionManager, + "Retrieving encryption keys requires a StandardEncryptionManager"); + StandardEncryptionManager sem = (StandardEncryptionManager) em; + return sem.encryptionKeys(); + } + /** * Encrypts the key metadata for a manifest list. * diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index d000221bb633..b68cf4ed67e4 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.Base64; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.TableProperties; @@ -46,9 +47,19 @@ private class TransientEncryptionState { private final Map encryptionKeys; private final LoadingCache unwrappedKeyCache; - private TransientEncryptionState(KeyManagementClient kmsClient) { + private TransientEncryptionState(KeyManagementClient kmsClient, List keys) { this.kmsClient = kmsClient; this.encryptionKeys = Maps.newLinkedHashMap(); + + if (keys != null) { + for (EncryptedKey key : keys) { + encryptionKeys.put( + key.keyId(), + new BaseEncryptedKey( + key.keyId(), key.encryptedKeyMetadata(), key.encryptedById(), key.properties())); + } + } + this.unwrappedKeyCache = Caffeine.newBuilder() .expireAfterWrite(1, TimeUnit.HOURS) @@ -64,12 +75,25 @@ private TransientEncryptionState(KeyManagementClient kmsClient) { private transient volatile SecureRandom lazyRNG = null; /** + * @deprecated will be removed in 2.0. + */ + @Deprecated + public StandardEncryptionManager( + String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + this(List.of(), tableKeyId, dataKeyLength, kmsClient); + } + + /** + * @param keys encryption keys from table metadata * @param tableKeyId table encryption key id * @param dataKeyLength length of data encryption key (16/24/32 bytes) * @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption */ public StandardEncryptionManager( - String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) { + List keys, + String tableKeyId, + int dataKeyLength, + KeyManagementClient kmsClient) { Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null"); Preconditions.checkArgument( dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32, @@ -77,7 +101,7 @@ public StandardEncryptionManager( dataKeyLength); Preconditions.checkNotNull(kmsClient, "Invalid KMS client: null"); this.tableKeyId = tableKeyId; - this.transientState = new TransientEncryptionState(kmsClient); + this.transientState = new TransientEncryptionState(kmsClient, keys); this.dataKeyLength = dataKeyLength; } @@ -134,6 +158,14 @@ public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { return transientState.kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); } + Map encryptionKeys() { + if (transientState == null) { + throw new IllegalStateException("Cannot return the encryption keys after serialization"); + } + + return transientState.encryptionKeys; + } + private String keyEncryptionKeyID() { if (transientState == null) { throw new IllegalStateException("Cannot return the current key after serialization"); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index b0d9e224634d..c154e43304c6 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.hive; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -45,6 +46,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchIcebergViewException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; @@ -88,6 +91,7 @@ public class HiveCatalog extends BaseMetastoreViewCatalog private String name; private Configuration conf; private FileIO fileIO; + private KeyManagementClient keyManagementClient; private ClientPool clients; private boolean listAllTables = false; private Map catalogProperties; @@ -122,6 +126,10 @@ public void initialize(String inputName, Map properties) { ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); + if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.keyManagementClient = EncryptionUtil.createKmsClient(properties); + } + this.clients = new CachedClientPool(conf, properties); } @@ -686,7 +694,8 @@ private boolean isValidateNamespace(Namespace namespace) { public TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); + return new HiveTableOperations( + conf, clients, fileIO, keyManagementClient, name, dbName, tableName); } @Override @@ -815,6 +824,15 @@ protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } + @Override + public void close() throws IOException { + super.close(); + + if (keyManagementClient != null) { + keyManagementClient.close(); + } + } + @VisibleForTesting void setListAllTables(boolean listAllTables) { this.listAllTables = listAllTables; diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index f0183fc5c578..1f2bdab7cffb 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -19,6 +19,8 @@ package org.apache.iceberg.hive; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -32,8 +34,17 @@ import org.apache.iceberg.BaseMetastoreOperations; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.ClientPool; +import org.apache.iceberg.LocationProviders; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -41,7 +52,9 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,18 +79,27 @@ public class HiveTableOperations extends BaseMetastoreTableOperations private final long maxHiveTablePropertySize; private final int metadataRefreshMaxRetries; private final FileIO fileIO; + private final KeyManagementClient keyManagementClient; private final ClientPool metaClients; + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String tableKeyId; + private int encryptionDekLength; + private List encryptedKeysFromMetadata; + protected HiveTableOperations( Configuration conf, ClientPool metaClients, FileIO fileIO, + KeyManagementClient keyManagementClient, String catalogName, String database, String table) { this.conf = conf; this.metaClients = metaClients; this.fileIO = fileIO; + this.keyManagementClient = keyManagementClient; this.fullName = catalogName + "." + database + "." + table; this.catalogName = catalogName; this.database = database; @@ -97,12 +119,48 @@ protected String tableName() { @Override public FileIO io() { - return fileIO; + if (tableKeyId == null) { + return fileIO; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(fileIO, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (tableKeyId != null) { + if (keyManagementClient == null) { + throw new RuntimeException( + "Cant create encryption manager, because key management client is not set"); + } + + Map encryptionProperties = Maps.newHashMap(); + encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, tableKeyId); + encryptionProperties.put( + TableProperties.ENCRYPTION_DEK_LENGTH, String.valueOf(encryptionDekLength)); + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeysFromMetadata, encryptionProperties, keyManagementClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; } @Override protected void doRefresh() { String metadataLocation = null; + String tableKeyIdFromHMS = null; + String dekLengthFromHMS = null; try { Table table = metaClients.run(client -> client.getTable(database, tableName)); @@ -112,7 +170,12 @@ protected void doRefresh() { HiveOperationsBase.validateTableIsIceberg(table, fullName); metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP); - + /* Table key ID must be retrieved from a catalog service, and not from untrusted storage + (e.g. metadata json file) that can be tampered with. For example, an attacker can remove + the table key parameter (along with existing snapshots) in the file, making the writers + produce unencrypted files. Table key ID is taken directly from HMS catalog */ + tableKeyIdFromHMS = table.getParameters().get(TableProperties.ENCRYPTION_TABLE_KEY); + dekLengthFromHMS = table.getParameters().get(TableProperties.ENCRYPTION_DEK_LENGTH); } catch (NoSuchObjectException e) { if (currentMetadataLocation() != null) { throw new NoSuchTableException("No such table: %s.%s", database, tableName); @@ -129,13 +192,44 @@ protected void doRefresh() { } refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); + + if (tableKeyIdFromHMS != null) { + checkEncryptionProperties(tableKeyIdFromHMS, dekLengthFromHMS); + + tableKeyId = tableKeyIdFromHMS; + encryptionDekLength = + (dekLengthFromHMS != null) + ? Integer.parseInt(dekLengthFromHMS) + : TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT; + + encryptedKeysFromMetadata = current().encryptionKeys(); + // Force re-creation of encryption manager with updated keys + encryptingFileIO = null; + encryptionManager = null; + } } @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; - String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); + encryptionPropsFromMetadata(metadata.properties()); + + String newMetadataLocation; + EncryptionManager encrManager = encryption(); + if (encrManager instanceof StandardEncryptionManager) { + // Add new encryption keys to the metadata + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Map.Entry entry : + EncryptionUtil.encryptionKeys(encrManager).entrySet()) { + builder.addEncryptionKey(entry.getValue()); + } + + newMetadataLocation = writeNewMetadataIfRequired(newTable, builder.build()); + } else { + newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); + } + boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); @@ -194,6 +288,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { .collect(Collectors.toSet()); } + if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) { + throw new RuntimeException("Cannot remove key in encrypted table"); + } + HMSTablePropertyHelper.updateHmsTableForIcebergTable( newMetadataLocation, tbl, @@ -321,6 +419,54 @@ public ClientPool metaClients() { return metaClients; } + @Override + public TableOperations temp(TableMetadata uncommittedMetadata) { + return new TableOperations() { + @Override + public TableMetadata current() { + return uncommittedMetadata; + } + + @Override + public TableMetadata refresh() { + throw new UnsupportedOperationException( + "Cannot call refresh on temporary table operations"); + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException("Cannot call commit on temporary table operations"); + } + + @Override + public String metadataFileLocation(String fileName) { + return HiveTableOperations.this.metadataFileLocation(uncommittedMetadata, fileName); + } + + @Override + public LocationProvider locationProvider() { + return LocationProviders.locationsFor( + uncommittedMetadata.location(), uncommittedMetadata.properties()); + } + + @Override + public FileIO io() { + HiveTableOperations.this.encryptionPropsFromMetadata(uncommittedMetadata.properties()); + return HiveTableOperations.this.io(); + } + + @Override + public EncryptionManager encryption() { + return HiveTableOperations.this.encryption(); + } + + @Override + public long newSnapshotId() { + return HiveTableOperations.this.newSnapshotId(); + } + }; + } + /** * Returns if the hive engine related values should be enabled on the table, or not. * @@ -375,6 +521,44 @@ private static boolean hiveLockEnabled(TableMetadata metadata, Configuration con ConfigProperties.LOCK_HIVE_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT); } + private void encryptionPropsFromMetadata(Map tableProperties) { + if (tableKeyId == null) { + tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + } + + if (tableKeyId != null && encryptionDekLength <= 0) { + String dekLength = tableProperties.get(TableProperties.ENCRYPTION_DEK_LENGTH); + encryptionDekLength = + (dekLength == null) + ? TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT + : Integer.parseInt(dekLength); + } + } + + private void checkEncryptionProperties(String encryptionKeyIdFromHMS, String dekLengthFromHMS) { + Map propertiesFromMetadata = current().properties(); + + String encryptionKeyIdFromMetadata = + propertiesFromMetadata.get(TableProperties.ENCRYPTION_TABLE_KEY); + if (!Objects.equals(encryptionKeyIdFromHMS, encryptionKeyIdFromMetadata)) { + String errMsg = + String.format( + "Metadata file might have been modified. Encryption key id %s differs from HMS value %s", + encryptionKeyIdFromMetadata, encryptionKeyIdFromHMS); + throw new RuntimeException(errMsg); + } + + String dekLengthFromMetadata = + propertiesFromMetadata.get(TableProperties.ENCRYPTION_DEK_LENGTH); + if (!Objects.equals(dekLengthFromHMS, dekLengthFromMetadata)) { + String errMsg = + String.format( + "Metadata file might have been modified. DEK length %s differs from HMS value %s", + dekLengthFromMetadata, dekLengthFromHMS); + throw new RuntimeException(errMsg); + } + } + @VisibleForTesting HiveLock lockObject(TableMetadata metadata) { if (hiveLockEnabled(metadata, conf)) { diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index 0ffcb057095f..e62ce9aeaed2 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -185,6 +185,7 @@ public void before() throws Exception { overriddenHiveConf, spyCachedClientPool, ops.io(), + null, catalog.name(), dbName, tableName)); @@ -615,6 +616,7 @@ public void testNoLockCallsWithNoLock() throws TException { confWithLock, spyCachedClientPool, ops.io(), + null, catalog.name(), TABLE_IDENTIFIER.namespace().level(0), TABLE_IDENTIFIER.name())); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java new file mode 100644 index 000000000000..6094ab0ccca5 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.UnitestKMS; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.types.Types; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; + +public class TestCTASEncryption extends CatalogTestBase { + private static Map appendCatalogEncryptionProperties(Map props) { + Map newProps = Maps.newHashMap(); + newProps.putAll(props); + newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); + return newProps; + } + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + } + }; + } + + @BeforeEach + public void createTables() { + sql("CREATE TABLE %s (id bigint, data string, float float) USING iceberg ", tableName + "1"); + sql( + "INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", + tableName + "1"); + + sql( + "CREATE TABLE %s USING iceberg " + + "TBLPROPERTIES ( " + + "'encryption.key-id'='%s')" + + " AS SELECT * from %s", + tableName, UnitestKMS.MASTER_KEY_NAME1, tableName + "1"); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s", tableName + "1"); + } + + @TestTemplate + public void testSelect() { + List expected = + ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", Float.NaN)); + + assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", tableName)); + } + + @TestTemplate + public void testDirectDataFileRead() { + List dataFileTable = + sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.ALL_DATA_FILES); + List dataFiles = + Streams.concat(dataFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (dataFiles.isEmpty()) { + throw new RuntimeException("No data files found for table " + tableName); + } + + Schema schema = new Schema(optional(0, "id", Types.IntegerType.get())); + for (String filePath : dataFiles) { + assertThatThrownBy( + () -> + Parquet.read(localInput(filePath)) + .project(schema) + .callInit() + .build() + .iterator() + .next()) + .isInstanceOf(ParquetCryptoRuntimeException.class) + .hasMessageContaining("Trying to read file with encrypted footer. No keys available"); + } + } +} diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java new file mode 100644 index 000000000000..695a41681b13 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.apache.iceberg.Files.localInput; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.Ciphers; +import org.apache.iceberg.encryption.UnitestKMS; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.types.Types; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; + +public class TestTableEncryption extends CatalogTestBase { + private static Map appendCatalogEncryptionProperties(Map props) { + Map newProps = Maps.newHashMap(); + newProps.putAll(props); + newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); + return newProps; + } + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + } + }; + } + + @BeforeEach + public void createTables() { + sql( + "CREATE TABLE %s (id bigint, data string, float float) USING iceberg " + + "TBLPROPERTIES ( " + + "'encryption.key-id'='%s')", + tableName, UnitestKMS.MASTER_KEY_NAME1); + + sql("INSERT INTO %s VALUES (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', float('NaN'))", tableName); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testSelect() { + List expected = + ImmutableList.of(row(1L, "a", 1.0F), row(2L, "b", 2.0F), row(3L, "c", Float.NaN)); + + assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", tableName)); + } + + private static List currentDataFiles(Table table) { + return Streams.stream(table.newScan().planFiles()) + .map(FileScanTask::file) + .collect(Collectors.toList()); + } + + @TestTemplate + public void testRefresh() { + catalog.initialize(catalogName, catalogConfig); + Table table = catalog.loadTable(tableIdent); + + assertThat(currentDataFiles(table)).isNotEmpty(); + + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', float('NaN'))", tableName); + + table.refresh(); + assertThat(currentDataFiles(table)).isNotEmpty(); + } + + @TestTemplate + public void testInsertAndDelete() { + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', float('NaN'))", tableName); + + List expected = + ImmutableList.of( + row(1L, "a", 1.0F), + row(2L, "b", 2.0F), + row(3L, "c", Float.NaN), + row(4L, "d", 4.0F), + row(5L, "e", 5.0F), + row(6L, "f", Float.NaN)); + + assertEquals( + "Should return all expected rows", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("DELETE FROM %s WHERE id < 4", tableName); + + expected = ImmutableList.of(row(4L, "d", 4.0F), row(5L, "e", 5.0F), row(6L, "f", Float.NaN)); + + assertEquals( + "Should return all expected rows", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @TestTemplate + public void testKeyDelete() { + assertThatThrownBy( + () -> sql("ALTER TABLE %s UNSET TBLPROPERTIES (`encryption.key-id`)", tableName)) + .hasMessageContaining("Cannot remove key in encrypted table"); + } + + @TestTemplate + public void testDirectDataFileRead() { + List dataFileTable = + sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.ALL_DATA_FILES); + List dataFiles = + Streams.concat(dataFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (dataFiles.isEmpty()) { + throw new RuntimeException("No data files found for table " + tableName); + } + + Schema schema = new Schema(optional(0, "id", Types.IntegerType.get())); + for (String filePath : dataFiles) { + assertThatThrownBy( + () -> + Parquet.read(localInput(filePath)) + .project(schema) + .callInit() + .build() + .iterator() + .next()) + .isInstanceOf(ParquetCryptoRuntimeException.class) + .hasMessageContaining("Trying to read file with encrypted footer. No keys available"); + } + } + + @TestTemplate + public void testManifestEncryption() throws IOException { + List manifestFileTable = + sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS); + + List manifestFiles = + Streams.concat(manifestFileTable.stream()) + .map(row -> (String) row[0]) + .collect(Collectors.toList()); + + if (manifestFiles.isEmpty()) { + throw new RuntimeException("No manifest files found for table " + tableName); + } + + String metadataFolderPath = null; + + // Check encryption of manifest files + for (String manifestFilePath : manifestFiles) { + checkMetadataFileEncryption(localInput(manifestFilePath)); + + if (metadataFolderPath == null) { + metadataFolderPath = new File(manifestFilePath).getParent().replaceFirst("file:", ""); + } + } + + if (metadataFolderPath == null) { + throw new RuntimeException("No metadata folder found for table " + tableName); + } + + // Find manifest list and metadata files; check their encryption + File[] listOfMetadataFiles = new File(metadataFolderPath).listFiles(); + boolean foundManifestListFile = false; + + for (File metadataFile : listOfMetadataFiles) { + if (metadataFile.getName().startsWith("snap-")) { + foundManifestListFile = true; + checkMetadataFileEncryption(localInput(metadataFile)); + } + } + + if (!foundManifestListFile) { + throw new RuntimeException("No manifest list files found for table " + tableName); + } + } + + private void checkMetadataFileEncryption(InputFile file) throws IOException { + SeekableInputStream stream = file.newStream(); + byte[] magic = new byte[4]; + stream.read(magic); + stream.close(); + assertThat(magic).isEqualTo(Ciphers.GCM_STREAM_MAGIC_STRING.getBytes(StandardCharsets.UTF_8)); + } +}