diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 814ed978c4ad..6bece9b5f5db 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -48,6 +48,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -161,6 +163,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; + private KeyManagementClient keyManagementClient = null; private String namespaceSeparator = null; public RESTSessionCatalog() { @@ -264,6 +267,12 @@ public void initialize(String name, Map unresolved) { mergedProps, RESTCatalogProperties.METRICS_REPORTING_ENABLED, RESTCatalogProperties.METRICS_REPORTING_ENABLED_DEFAULT); + + if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.keyManagementClient = EncryptionUtil.createKmsClient(mergedProps); + this.closeables.addCloseable(this.keyManagementClient); + } + this.namespaceSeparator = PropertyUtil.propertyAsString( mergedProps, @@ -471,6 +480,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, tableMetadata, endpoints); @@ -551,6 +561,7 @@ public Table registerTable( Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, response.tableMetadata(), endpoints); @@ -815,6 +826,7 @@ public Table create() { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, response.tableMetadata(), endpoints); @@ -843,6 +855,7 @@ public Transaction createTransaction() { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta, @@ -907,6 +920,7 @@ public Transaction replaceTransaction() { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, RESTTableOperations.UpdateType.REPLACE, changes.build(), base, @@ -1047,6 +1061,7 @@ private FileIO tableFileIO( * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files + * @param kmsClient the {@link KeyManagementClient} for encrypted tables * @param current the current table metadata * @param supportedEndpoints the set of supported REST endpoints * @return a new RESTTableOperations instance @@ -1057,10 +1072,18 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaderSupplier, FileIO fileIO, + KeyManagementClient kmsClient, TableMetadata current, Set supportedEndpoints) { return new RESTTableOperations( - restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints); + restClient, + path, + readHeaders, + mutationHeaderSupplier, + fileIO, + kmsClient, + current, + supportedEndpoints); } /** @@ -1077,6 +1100,7 @@ protected RESTTableOperations newTableOps( * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files + * @param kmsClient the {@link KeyManagementClient} for encrypted tables * @param updateType the {@link RESTTableOperations.UpdateType} being performed * @param createChanges the list of metadata updates to apply during table creation or replacement * @param current the current table metadata (may be null for CREATE operations) @@ -1089,6 +1113,7 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaderSupplier, FileIO fileIO, + KeyManagementClient kmsClient, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, @@ -1099,6 +1124,7 @@ protected RESTTableOperations newTableOps( readHeaders, mutationHeaderSupplier, fileIO, + kmsClient, updateType, createChanges, current, diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index d2a6ab618ca8..f6dab98e41be 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -21,9 +21,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.SnapshotRef; @@ -32,17 +35,25 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; class RESTTableOperations implements TableOperations { private static final String METADATA_FOLDER_NAME = "metadata"; @@ -58,17 +69,26 @@ enum UpdateType { private final Supplier> readHeaders; private final Supplier> mutationHeaders; private final FileIO io; + private final KeyManagementClient keyManagementClient; private final List createChanges; private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String tableKeyId; + private int encryptionDekLength; + + private List encryptedKeys = List.of(); + RESTTableOperations( RESTClient client, String path, Supplier> headers, FileIO io, + KeyManagementClient keyManagementClient, TableMetadata current, Set endpoints) { this( @@ -77,6 +97,7 @@ enum UpdateType { headers, headers, io, + keyManagementClient, UpdateType.SIMPLE, Lists.newArrayList(), current, @@ -88,11 +109,22 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + KeyManagementClient keyManagementClient, UpdateType updateType, List createChanges, TableMetadata current, Set endpoints) { - this(client, path, headers, headers, io, updateType, createChanges, current, endpoints); + this( + client, + path, + headers, + headers, + io, + keyManagementClient, + updateType, + createChanges, + current, + endpoints); } RESTTableOperations( @@ -101,6 +133,7 @@ enum UpdateType { Supplier> readHeaders, Supplier> mutationHeaders, FileIO io, + KeyManagementClient keyManagementClient, TableMetadata current, Set endpoints) { this( @@ -109,6 +142,7 @@ enum UpdateType { readHeaders, mutationHeaders, io, + keyManagementClient, UpdateType.SIMPLE, Lists.newArrayList(), current, @@ -121,6 +155,7 @@ enum UpdateType { Supplier> readHeaders, Supplier> mutationHeaders, FileIO io, + KeyManagementClient keyManagementClient, UpdateType updateType, List createChanges, TableMetadata current, @@ -130,6 +165,7 @@ enum UpdateType { this.readHeaders = readHeaders; this.mutationHeaders = mutationHeaders; this.io = io; + this.keyManagementClient = keyManagementClient; this.updateType = updateType; this.createChanges = createChanges; this.replaceBase = current; @@ -139,6 +175,10 @@ enum UpdateType { this.current = current; } this.endpoints = endpoints; + + // N.B. We don't use this.current due it being null for the CREATE update type; we still + // want encryption configured for this case. + encryptionPropsFromMetadata(current); } @Override @@ -156,6 +196,21 @@ public TableMetadata refresh() { @Override public void commit(TableMetadata base, TableMetadata metadata) { Endpoint.check(endpoints, Endpoint.V1_UPDATE_TABLE); + + if (encryption() instanceof StandardEncryptionManager) { + // Add encryption keys to the to-be-committed metadata + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Map.Entry entry : + EncryptionUtil.encryptionKeys(encryption()).entrySet()) { + builder.addEncryptionKey(entry.getValue()); + } + commitInternal(base, builder.build()); + } else { + commitInternal(base, metadata); + } + } + + private void commitInternal(TableMetadata base, TableMetadata metadata) { Consumer errorHandler; List requirements; List updates; @@ -196,6 +251,19 @@ public void commit(TableMetadata base, TableMetadata metadata) { String.format("Update type %s is not supported", updateType)); } + if (base != null) { + boolean encryptionKeyRemoved = + base.properties().containsKey(TableProperties.ENCRYPTION_TABLE_KEY) + && !metadata.properties().containsKey(TableProperties.ENCRYPTION_TABLE_KEY); + Preconditions.checkArgument(!encryptionKeyRemoved, "Cannot remove key in encrypted table"); + + boolean encryptionKeyUnchanged = + Objects.equals( + base.properties().get(TableProperties.ENCRYPTION_TABLE_KEY), + metadata.properties().get(TableProperties.ENCRYPTION_TABLE_KEY)); + Preconditions.checkArgument(encryptionKeyUnchanged, "Cannot modify key in encrypted table"); + } + UpdateTableRequest request = new UpdateTableRequest(requirements, updates); // the error handler will throw necessary exceptions like CommitFailedException and @@ -245,7 +313,44 @@ private boolean reconcileOnSimpleUpdate( @Override public FileIO io() { - return io; + if (tableKeyId == null) { + return io; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(io, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (tableKeyId != null) { + Preconditions.checkArgument( + keyManagementClient != null, + "Cannot create encryption manager without a key management client. Consider setting the '%s' catalog property", + CatalogProperties.ENCRYPTION_KMS_IMPL); + + Map encryptionProperties = + ImmutableMap.of( + TableProperties.ENCRYPTION_TABLE_KEY, + tableKeyId, + TableProperties.ENCRYPTION_DEK_LENGTH, + String.valueOf(encryptionDekLength)); + + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeys, encryptionProperties, keyManagementClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; } private static Long expectedSnapshotIdIfSnapshotAddOnly(List updates) { @@ -285,6 +390,45 @@ private static Long expectedSnapshotIdIfSnapshotAddOnly(List upd return addedSnapshotId; } + private void encryptionPropsFromMetadata(TableMetadata metadata) { + if (metadata == null || metadata.properties() == null) { + return; + } + + // Refresh encryption-related properties and keys on new/refreshed metadata + Map tableProperties = metadata.properties(); + tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + + if (tableKeyId != null) { + encryptionDekLength = + PropertyUtil.propertyAsInt( + tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + + encryptedKeys = + Optional.ofNullable(metadata.encryptionKeys()) + .map(Lists::newLinkedList) + .orElseGet(Lists::newLinkedList); + + // Include pending encryption keys from the encryption manager + if (encryptionManager != null) { + Set keyIdsFromMetadata = + encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet()); + + for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) { + if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) { + encryptedKeys.add(keyFromEM); + } + } + } + } + + // Force re-creation of encryption manager + encryptingFileIO = null; + encryptionManager = null; + } + private TableMetadata updateCurrentMetadata(LoadTableResponse response) { // LoadTableResponse is used to deserialize the response, but config is not allowed by the REST // spec so it can be @@ -292,6 +436,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) { if (current == null || !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { this.current = checkUUID(current, response.tableMetadata()); + encryptionPropsFromMetadata(current); } return current; diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 753d8cb247c2..2a9fb7e75643 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -73,6 +73,7 @@ import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NotAuthorizedException; @@ -3120,6 +3121,7 @@ public void testCommitStateUnknownNotReconciled() { } @Test + @SuppressWarnings("MethodLength") public void testCustomTableOperationsInjection() throws IOException { AtomicBoolean customTableOpsCalled = new AtomicBoolean(); AtomicBoolean customTransactionTableOpsCalled = new AtomicBoolean(); @@ -3135,9 +3137,17 @@ class CustomRESTTableOperations extends RESTTableOperations { String path, Supplier> headers, FileIO fileIO, + KeyManagementClient keyManagementClient, TableMetadata current, Set supportedEndpoints) { - super(client, path, () -> customHeaders, fileIO, current, supportedEndpoints); + super( + client, + path, + () -> customHeaders, + fileIO, + keyManagementClient, + current, + supportedEndpoints); customTableOpsCalled.set(true); } @@ -3146,6 +3156,7 @@ class CustomRESTTableOperations extends RESTTableOperations { String path, Supplier> headers, FileIO fileIO, + KeyManagementClient keyManagementClient, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, @@ -3155,6 +3166,7 @@ class CustomRESTTableOperations extends RESTTableOperations { path, () -> customHeaders, fileIO, + keyManagementClient, updateType, createChanges, current, @@ -3178,11 +3190,12 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaders, FileIO fileIO, + KeyManagementClient kmsClient, TableMetadata current, Set supportedEndpoints) { RESTTableOperations ops = new CustomRESTTableOperations( - restClient, path, mutationHeaders, fileIO, current, supportedEndpoints); + restClient, path, mutationHeaders, fileIO, kmsClient, current, supportedEndpoints); RESTTableOperations spy = Mockito.spy(ops); capturedOps.set(spy); return spy; @@ -3195,6 +3208,7 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaders, FileIO fileIO, + KeyManagementClient kmsClient, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, @@ -3205,6 +3219,7 @@ protected RESTTableOperations newTableOps( path, mutationHeaders, fileIO, + kmsClient, updateType, createChanges, current, diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java index 3dee6e1e1d54..aebc4a4ca416 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java @@ -32,6 +32,7 @@ import org.apache.iceberg.encryption.UnitestKMS; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.CatalogTestBase; @@ -57,6 +58,15 @@ protected static Object[][] parameters() { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) } }; } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java index 85e7f48b59d7..1077c0154a47 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java @@ -50,6 +50,7 @@ import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.CatalogTestBase; @@ -76,6 +77,15 @@ protected static Object[][] parameters() { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) } }; } @@ -112,8 +122,8 @@ private static List currentDataFiles(Table table) { @TestTemplate public void testRefresh() { - catalog.initialize(catalogName, catalogConfig); - Table table = catalog.loadTable(tableIdent); + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); assertThat(currentDataFiles(table)).isNotEmpty(); @@ -124,10 +134,26 @@ public void testRefresh() { } @TestTemplate - public void testTransaction() { - catalog.initialize(catalogName, catalogConfig); + public void testAppendTransaction() { + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); - Table table = catalog.loadTable(tableIdent); + List dataFiles = currentDataFiles(table); + Transaction transaction = table.newTransaction(); + AppendFiles append = transaction.newAppend(); + + // add an arbitrary datafile + append.appendFile(dataFiles.get(0)); + append.commit(); + transaction.commitTransaction(); + + assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 1); + } + + @TestTemplate + public void testConcurrentAppendTransactions() { + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); List dataFiles = currentDataFiles(table); Transaction transaction = table.newTransaction(); @@ -135,10 +161,46 @@ public void testTransaction() { // add an arbitrary datafile append.appendFile(dataFiles.get(0)); + + // append to the table in the meantime. use a separate load to avoid shared operations + validationCatalog.loadTable(tableIdent).newFastAppend().appendFile(dataFiles.get(0)).commit(); + append.commit(); transaction.commitTransaction(); - assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1); + assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2); + } + + // See CatalogTests#testConcurrentReplaceTransactions + @TestTemplate + public void testConcurrentReplaceTransactions() { + validationCatalog.initialize(catalogName, catalogConfig); + + Table table = validationCatalog.loadTable(tableIdent); + DataFile file = currentDataFiles(table).get(0); + Schema schema = table.schema(); + + // Write data for a replace transaction that will be committed later + Transaction secondReplace = + validationCatalog + .buildTable(tableIdent, schema) + .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1) + .replaceTransaction(); + secondReplace.newFastAppend().appendFile(file).commit(); + + // Commit another replace transaction first + Transaction firstReplace = + validationCatalog + .buildTable(tableIdent, schema) + .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1) + .replaceTransaction(); + firstReplace.newFastAppend().appendFile(file).commit(); + firstReplace.commitTransaction(); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = validationCatalog.loadTable(tableIdent); + assertThat(currentDataFiles(afterSecondReplace)).hasSize(1); } @TestTemplate