Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand Down Expand Up @@ -161,6 +163,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private CloseableGroup closeables = null;
private Set<Endpoint> endpoints;
private Supplier<Map<String, String>> mutationHeaders = Map::of;
private KeyManagementClient keyManagementClient = null;
private String namespaceSeparator = null;

public RESTSessionCatalog() {
Expand Down Expand Up @@ -264,6 +267,12 @@ public void initialize(String name, Map<String, String> unresolved) {
mergedProps,
RESTCatalogProperties.METRICS_REPORTING_ENABLED,
RESTCatalogProperties.METRICS_REPORTING_ENABLED_DEFAULT);

if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) {
this.keyManagementClient = EncryptionUtil.createKmsClient(mergedProps);
this.closeables.addCloseable(this.keyManagementClient);
}

this.namespaceSeparator =
PropertyUtil.propertyAsString(
mergedProps,
Expand Down Expand Up @@ -471,6 +480,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
tableMetadata,
endpoints);

Expand Down Expand Up @@ -551,6 +561,7 @@ public Table registerTable(
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
response.tableMetadata(),
endpoints);

Expand Down Expand Up @@ -815,6 +826,7 @@ public Table create() {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
response.tableMetadata(),
endpoints);

Expand Down Expand Up @@ -843,6 +855,7 @@ public Transaction createTransaction() {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
RESTTableOperations.UpdateType.CREATE,
createChanges(meta),
meta,
Expand Down Expand Up @@ -907,6 +920,7 @@ public Transaction replaceTransaction() {
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
keyManagementClient,
RESTTableOperations.UpdateType.REPLACE,
changes.build(),
base,
Expand Down Expand Up @@ -1047,6 +1061,7 @@ private FileIO tableFileIO(
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param kmsClient the {@link KeyManagementClient} for encrypted tables
* @param current the current table metadata
* @param supportedEndpoints the set of supported REST endpoints
* @return a new RESTTableOperations instance
Expand All @@ -1057,10 +1072,18 @@ protected RESTTableOperations newTableOps(
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
FileIO fileIO,
KeyManagementClient kmsClient,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTTableOperations(
restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints);
restClient,
path,
readHeaders,
mutationHeaderSupplier,
fileIO,
kmsClient,
current,
supportedEndpoints);
}

/**
Expand All @@ -1077,6 +1100,7 @@ protected RESTTableOperations newTableOps(
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param kmsClient the {@link KeyManagementClient} for encrypted tables
* @param updateType the {@link RESTTableOperations.UpdateType} being performed
* @param createChanges the list of metadata updates to apply during table creation or replacement
* @param current the current table metadata (may be null for CREATE operations)
Expand All @@ -1089,6 +1113,7 @@ protected RESTTableOperations newTableOps(
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
FileIO fileIO,
KeyManagementClient kmsClient,
RESTTableOperations.UpdateType updateType,
List<MetadataUpdate> createChanges,
TableMetadata current,
Expand All @@ -1099,6 +1124,7 @@ protected RESTTableOperations newTableOps(
readHeaders,
mutationHeaderSupplier,
fileIO,
kmsClient,
updateType,
createChanges,
current,
Expand Down
Loading
Loading