diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index a4338ec07a5f..134fded452fd 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -90,12 +90,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations private String tableKeyId; private int encryptionDekLength; - // keys loaded from the latest metadata - private Optional> encryptedKeysFromMetadata = Optional.empty(); - - // keys added to EM (e.g. as a result of a FileAppend) but not committed into the latest metadata - // yet - private Optional> encryptedKeysPending = Optional.empty(); + private List encryptedKeys = List.of(); protected HiveTableOperations( Configuration conf, @@ -156,12 +151,9 @@ public EncryptionManager encryption() { encryptionProperties.put( TableProperties.ENCRYPTION_DEK_LENGTH, String.valueOf(encryptionDekLength)); - List keys = Lists.newLinkedList(); - encryptedKeysFromMetadata.ifPresent(keys::addAll); - encryptedKeysPending.ifPresent(keys::addAll); - encryptionManager = - EncryptionUtil.createEncryptionManager(keys, encryptionProperties, keyManagementClient); + EncryptionUtil.createEncryptionManager( + encryptedKeys, encryptionProperties, keyManagementClient); } else { return PlaintextEncryptionManager.instance(); } @@ -215,24 +207,20 @@ the table key parameter (along with existing snapshots) in the file, making the ? Integer.parseInt(dekLengthFromHMS) : TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT; - encryptedKeysFromMetadata = Optional.ofNullable(current().encryptionKeys()); + encryptedKeys = + Optional.ofNullable(current().encryptionKeys()) + .map(Lists::newLinkedList) + .orElseGet(Lists::newLinkedList); if (encryptionManager != null) { - encryptedKeysPending = Optional.of(Lists.newLinkedList()); - Set keyIdsFromMetadata = - encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream() - .map(EncryptedKey::keyId) - .collect(Collectors.toSet()); + encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet()); for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) { if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) { - encryptedKeysPending.get().add(keyFromEM); + encryptedKeys.add(keyFromEM); } } - - } else { - encryptedKeysPending = Optional.empty(); } // Force re-creation of encryption manager with updated keys diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java index c71bd28706c9..a26dd8dc3fc0 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java @@ -105,8 +105,8 @@ private static List currentDataFiles(Table table) { @TestTemplate public void testRefresh() { - catalog.initialize(catalogName, catalogConfig); - Table table = catalog.loadTable(tableIdent); + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); assertThat(currentDataFiles(table)).isNotEmpty(); @@ -117,10 +117,26 @@ public void testRefresh() { } @TestTemplate - public void testTransaction() { - catalog.initialize(catalogName, catalogConfig); + public void testAppendTransaction() { + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); - Table table = catalog.loadTable(tableIdent); + List dataFiles = currentDataFiles(table); + Transaction transaction = table.newTransaction(); + AppendFiles append = transaction.newAppend(); + + // add an arbitrary datafile + append.appendFile(dataFiles.get(0)); + append.commit(); + transaction.commitTransaction(); + + assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 1); + } + + @TestTemplate + public void testConcurrentAppendTransactions() { + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); List dataFiles = currentDataFiles(table); Transaction transaction = table.newTransaction(); @@ -128,10 +144,46 @@ public void testTransaction() { // add an arbitrary datafile append.appendFile(dataFiles.get(0)); + + // append to the table in the meantime. use a separate load to avoid shared operations + validationCatalog.loadTable(tableIdent).newFastAppend().appendFile(dataFiles.get(0)).commit(); + append.commit(); transaction.commitTransaction(); - assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1); + assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2); + } + + // See CatalogTests#testConcurrentReplaceTransactions + @TestTemplate + public void testConcurrentReplaceTransactions() { + validationCatalog.initialize(catalogName, catalogConfig); + + Table table = validationCatalog.loadTable(tableIdent); + DataFile file = currentDataFiles(table).get(0); + Schema schema = table.schema(); + + // Write data for a replace transaction that will be committed later + Transaction secondReplace = + validationCatalog + .buildTable(tableIdent, schema) + .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1) + .replaceTransaction(); + secondReplace.newFastAppend().appendFile(file).commit(); + + // Commit another replace transaction first + Transaction firstReplace = + validationCatalog + .buildTable(tableIdent, schema) + .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1) + .replaceTransaction(); + firstReplace.newFastAppend().appendFile(file).commit(); + firstReplace.commitTransaction(); + + secondReplace.commitTransaction(); + + Table afterSecondReplace = validationCatalog.loadTable(tableIdent); + assertThat(currentDataFiles(afterSecondReplace)).hasSize(1); } @TestTemplate