From afe30fae6819cff23f4acb739a57fb09e4c3b144 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Thu, 7 Aug 2025 16:25:54 -0700 Subject: [PATCH 01/16] Modularize calls to federated catalogs --- gradle/projects.main.properties | 1 + runtime/federator/build.gradle.kts | 59 +++++++++ .../federator/FederatedCatalogFactory.java | 114 ++++++++++++++++++ runtime/service/build.gradle.kts | 1 + .../iceberg/IcebergCatalogHandler.java | 40 +----- 5 files changed, 179 insertions(+), 36 deletions(-) create mode 100644 runtime/federator/build.gradle.kts create mode 100644 runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 39ab227411..b867182d0b 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -42,6 +42,7 @@ polaris-minio-testcontainer=tools/minio-testcontainer polaris-version=tools/version polaris-misc-types=tools/misc-types polaris-persistence-varint=nosql/persistence/varint +polaris-federator=runtime/federator polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator diff --git a/runtime/federator/build.gradle.kts b/runtime/federator/build.gradle.kts new file mode 100644 index 0000000000..af251cf260 --- /dev/null +++ b/runtime/federator/build.gradle.kts @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-client") + alias(libs.plugins.jandex) +} + +dependencies { + // Polaris dependencies + implementation(project(":polaris-core")) + + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + implementation("org.apache.iceberg:iceberg-core") + implementation("org.apache.iceberg:iceberg-common") + + // Hadoop dependencies (for Hadoop catalog support) + implementation(libs.hadoop.common) { + exclude("org.slf4j", "slf4j-reload4j") + exclude("org.slf4j", "slf4j-log4j12") + exclude("ch.qos.reload4j", "reload4j") + exclude("log4j", "log4j") + exclude("org.apache.zookeeper", "zookeeper") + exclude("org.apache.hadoop.thirdparty", "hadoop-shaded-protobuf_3_25") + exclude("com.github.pjfanning", "jersey-json") + exclude("com.sun.jersey", "jersey-core") + exclude("com.sun.jersey", "jersey-server") + exclude("com.sun.jersey", "jersey-servlet") + exclude("io.dropwizard.metrics", "metrics-core") + } + implementation(libs.hadoop.client.api) + implementation(libs.hadoop.client.runtime) + + // Logging + implementation(libs.slf4j.api) +} + +java { toolchain { languageVersion.set(JavaLanguageVersion.of(21)) } } + +tasks.withType { options.encoding = "UTF-8" } + +tasks.test { useJUnitPlatform() } diff --git a/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java b/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java new file mode 100644 index 0000000000..02aaf2ba02 --- /dev/null +++ b/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.federator; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory class for creating federated catalogs based on connection configuration. Currently + * supports Iceberg REST and Hadoop catalogs. + */ +public class FederatedCatalogFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(FederatedCatalogFactory.class); + + /** + * Creates a federated catalog based on the provided connection configuration. + * + * @param connectionConfigInfoDpo The connection configuration + * @param userSecretsManager The user secrets manager for handling credentials + * @return The initialized federated catalog + * @throws UnsupportedOperationException if the connection type is not supported + */ + public static Catalog createFederatedCatalog( + ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { + + ConnectionType connectionType = + ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); + + LOGGER.info("Creating federated catalog for connection type: {}", connectionType); + + Catalog federatedCatalog; + + switch (connectionType) { + case ICEBERG_REST: + federatedCatalog = createIcebergRestCatalog(connectionConfigInfoDpo, userSecretsManager); + break; + case HADOOP: + federatedCatalog = createHadoopCatalog(connectionConfigInfoDpo, userSecretsManager); + break; + default: + throw new UnsupportedOperationException( + "Connection type not supported for federation: " + connectionType); + } + + return federatedCatalog; + } + + private static Catalog createIcebergRestCatalog( + ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { + SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); + RESTCatalog restCatalog = + new RESTCatalog( + context, + (config) -> + HTTPClient.builder(config) + .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) + .build()); + + IcebergRestConnectionConfigInfoDpo icebergRestConfig = + (IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo; + + restCatalog.initialize( + icebergRestConfig.getRemoteCatalogName(), + connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); + + LOGGER.info( + "Initialized Iceberg REST catalog for remote catalog: {}", + icebergRestConfig.getRemoteCatalogName()); + + return restCatalog; + } + + private static Catalog createHadoopCatalog( + ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + + HadoopConnectionConfigInfoDpo hadoopConfig = + (HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo; + + hadoopCatalog.initialize( + hadoopConfig.getWarehouse(), + connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); + + LOGGER.info("Initialized Hadoop catalog for warehouse: {}", hadoopConfig.getWarehouse()); + + return hadoopCatalog; + } +} diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 037776c404..57ec03f252 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,6 +30,7 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) + implementation(project(":polaris-federator")) runtimeOnly(project(":polaris-relational-jdbc")) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 90b019a654..b52784418b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -45,7 +45,6 @@ import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; @@ -54,9 +53,6 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.rest.HTTPClient; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.credentials.ImmutableCredential; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; @@ -77,9 +73,6 @@ import org.apache.polaris.core.auth.PolarisAuthorizer; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.ConnectionType; -import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; @@ -95,6 +88,7 @@ import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.federator.FederatedCatalogFactory; import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.common.CatalogHandler; import org.apache.polaris.service.config.ReservedProperties; @@ -213,35 +207,9 @@ protected void initializeCatalog() { FeatureConfiguration.enforceFeatureEnabledOrThrow( callContext, FeatureConfiguration.ENABLE_CATALOG_FEDERATION); - Catalog federatedCatalog; - ConnectionType connectionType = - ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); - - switch (connectionType) { - case ICEBERG_REST: - SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); - federatedCatalog = - new RESTCatalog( - context, - (config) -> - HTTPClient.builder(config) - .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) - .build()); - federatedCatalog.initialize( - ((IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo).getRemoteCatalogName(), - connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); - break; - case HADOOP: - federatedCatalog = new HadoopCatalog(); - federatedCatalog.initialize( - ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(), - connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); - break; - default: - throw new UnsupportedOperationException( - "Connection type not supported: " + connectionType); - } - this.baseCatalog = federatedCatalog; + this.baseCatalog = + FederatedCatalogFactory.createFederatedCatalog( + connectionConfigInfoDpo, getUserSecretsManager()); } else { LOGGER.atInfo().log("Initializing non-federated catalog"); this.baseCatalog = From 63f25b1c2c4db2e2c864cbdd1205866f6b9d864d Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Thu, 7 Aug 2025 16:47:16 -0700 Subject: [PATCH 02/16] Remove verbose logs --- .../apache/polaris/federator/FederatedCatalogFactory.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java b/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java index 02aaf2ba02..9597d68548 100644 --- a/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java +++ b/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java @@ -52,8 +52,6 @@ public static Catalog createFederatedCatalog( ConnectionType connectionType = ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); - LOGGER.info("Creating federated catalog for connection type: {}", connectionType); - Catalog federatedCatalog; switch (connectionType) { @@ -89,10 +87,6 @@ private static Catalog createIcebergRestCatalog( icebergRestConfig.getRemoteCatalogName(), connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); - LOGGER.info( - "Initialized Iceberg REST catalog for remote catalog: {}", - icebergRestConfig.getRemoteCatalogName()); - return restCatalog; } @@ -107,8 +101,6 @@ private static Catalog createHadoopCatalog( hadoopConfig.getWarehouse(), connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); - LOGGER.info("Initialized Hadoop catalog for warehouse: {}", hadoopConfig.getWarehouse()); - return hadoopCatalog; } } From 5cafd63966a5304fdd546e2eb3759066c7bb3496 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Fri, 8 Aug 2025 11:08:19 -0700 Subject: [PATCH 03/16] Separate modules per non-irc catalog --- .../federation/hadoop}/build.gradle.kts | 0 .../hadoop/HadoopFederatedCatalogFactory.java | 58 ++++++++++ gradle/projects.main.properties | 2 +- .../federator/FederatedCatalogFactory.java | 106 ------------------ runtime/service/build.gradle.kts | 2 +- .../iceberg/IcebergCatalogHandler.java | 36 +++++- 6 files changed, 91 insertions(+), 113 deletions(-) rename {runtime/federator => extensions/federation/hadoop}/build.gradle.kts (100%) create mode 100644 extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java delete mode 100644 runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java diff --git a/runtime/federator/build.gradle.kts b/extensions/federation/hadoop/build.gradle.kts similarity index 100% rename from runtime/federator/build.gradle.kts rename to extensions/federation/hadoop/build.gradle.kts diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java new file mode 100644 index 0000000000..821ae89fd2 --- /dev/null +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extensions.federation.hadoop; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory class for creating federated hadoop catalogs based on connection configuration. + */ +public class HadoopFederatedCatalogFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); + + /** + * Creates a federated catalog based on the provided connection configuration. + * + * @param connectionConfigInfoDpo The connection configuration + * @param userSecretsManager The user secrets manager for handling credentials + * @return The initialized hadoop catalog + */ + public static Catalog createHadoopCatalog( + ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + + HadoopConnectionConfigInfoDpo hadoopConfig = + (HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo; + + hadoopCatalog.initialize( + hadoopConfig.getWarehouse(), + connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); + + return hadoopCatalog; + } +} diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index b867182d0b..1b74232b59 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -42,7 +42,7 @@ polaris-minio-testcontainer=tools/minio-testcontainer polaris-version=tools/version polaris-misc-types=tools/misc-types polaris-persistence-varint=nosql/persistence/varint -polaris-federator=runtime/federator +polaris-extensions-federation-hadoop=extensions/federation/hadoop polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator diff --git a/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java b/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java deleted file mode 100644 index 9597d68548..0000000000 --- a/runtime/federator/src/main/java/org/apache/polaris/federator/FederatedCatalogFactory.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.federator; - -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.SessionCatalog; -import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.iceberg.rest.HTTPClient; -import org.apache.iceberg.rest.RESTCatalog; -import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.ConnectionType; -import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; -import org.apache.polaris.core.secrets.UserSecretsManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Factory class for creating federated catalogs based on connection configuration. Currently - * supports Iceberg REST and Hadoop catalogs. - */ -public class FederatedCatalogFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(FederatedCatalogFactory.class); - - /** - * Creates a federated catalog based on the provided connection configuration. - * - * @param connectionConfigInfoDpo The connection configuration - * @param userSecretsManager The user secrets manager for handling credentials - * @return The initialized federated catalog - * @throws UnsupportedOperationException if the connection type is not supported - */ - public static Catalog createFederatedCatalog( - ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { - - ConnectionType connectionType = - ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); - - Catalog federatedCatalog; - - switch (connectionType) { - case ICEBERG_REST: - federatedCatalog = createIcebergRestCatalog(connectionConfigInfoDpo, userSecretsManager); - break; - case HADOOP: - federatedCatalog = createHadoopCatalog(connectionConfigInfoDpo, userSecretsManager); - break; - default: - throw new UnsupportedOperationException( - "Connection type not supported for federation: " + connectionType); - } - - return federatedCatalog; - } - - private static Catalog createIcebergRestCatalog( - ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { - SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); - RESTCatalog restCatalog = - new RESTCatalog( - context, - (config) -> - HTTPClient.builder(config) - .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) - .build()); - - IcebergRestConnectionConfigInfoDpo icebergRestConfig = - (IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo; - - restCatalog.initialize( - icebergRestConfig.getRemoteCatalogName(), - connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); - - return restCatalog; - } - - private static Catalog createHadoopCatalog( - ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { - HadoopCatalog hadoopCatalog = new HadoopCatalog(); - - HadoopConnectionConfigInfoDpo hadoopConfig = - (HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo; - - hadoopCatalog.initialize( - hadoopConfig.getWarehouse(), - connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); - - return hadoopCatalog; - } -} diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 57ec03f252..1dc0c8eb47 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) - implementation(project(":polaris-federator")) + implementation(project(":polaris-extensions-federation-hadoop")) runtimeOnly(project(":polaris-relational-jdbc")) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index b52784418b..a38329d69f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -45,6 +45,7 @@ import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; @@ -53,6 +54,8 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.credentials.ImmutableCredential; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; @@ -73,6 +76,8 @@ import org.apache.polaris.core.auth.PolarisAuthorizer; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; @@ -88,7 +93,7 @@ import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; -import org.apache.polaris.federator.FederatedCatalogFactory; +import org.apache.polaris.extensions.federation.hadoop.HadoopFederatedCatalogFactory; import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.common.CatalogHandler; import org.apache.polaris.service.config.ReservedProperties; @@ -206,10 +211,31 @@ protected void initializeCatalog() { .log("Initializing federated catalog"); FeatureConfiguration.enforceFeatureEnabledOrThrow( callContext, FeatureConfiguration.ENABLE_CATALOG_FEDERATION); - - this.baseCatalog = - FederatedCatalogFactory.createFederatedCatalog( - connectionConfigInfoDpo, getUserSecretsManager()); + + Catalog federatedCatalog; + ConnectionType connectionType = + ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); + switch (connectionType) { + case ICEBERG_REST: + SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); + federatedCatalog = + new RESTCatalog( + context, + (config) -> + HTTPClient.builder(config) + .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) + .build()); + federatedCatalog.initialize( + ((IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo).getRemoteCatalogName(), + connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); + break; + case HADOOP: + federatedCatalog = HadoopFederatedCatalogFactory.createHadoopCatalog(connectionConfigInfoDpo, getUserSecretsManager()); + break; + default: + throw new UnsupportedOperationException("Unsupported connection type: " + connectionType); + } + this.baseCatalog = federatedCatalog; } else { LOGGER.atInfo().log("Initializing non-federated catalog"); this.baseCatalog = From 91e18073851e07744cc83552c20286baeafeef58 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Fri, 8 Aug 2025 11:38:05 -0700 Subject: [PATCH 04/16] spotlesApply --- .../hadoop/HadoopFederatedCatalogFactory.java | 16 ++++------------ .../catalog/iceberg/IcebergCatalogHandler.java | 8 ++++---- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java index cceff257fe..438f837173 100644 --- a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -20,21 +20,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.polaris.core.connection.AuthenticationParametersDpo; import org.apache.polaris.core.connection.AuthenticationType; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; -import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.secrets.UserSecretsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Factory class for creating federated hadoop catalogs based on connection configuration. - */ +/** Factory class for creating federated hadoop catalogs based on connection configuration. */ public class HadoopFederatedCatalogFactory { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); @@ -54,16 +49,13 @@ public static Catalog createHadoopCatalog( connectionConfigInfoDpo.getAuthenticationParameters(); if (authenticationParametersDpo.getAuthenticationTypeCode() != AuthenticationType.IMPLICIT.getCode()) { - throw new IllegalStateException( - "Hadoop federation only supports IMPLICIT authentication."); + throw new IllegalStateException("Hadoop federation only supports IMPLICIT authentication."); } Configuration conf = new Configuration(); - String warehouse = - ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); + String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse); hadoopCatalog.initialize( - warehouse, - connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); + warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); return hadoopCatalog; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index d97697f94e..a699d5d2bf 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -75,8 +75,6 @@ import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; import org.apache.polaris.core.config.FeatureConfiguration; -import org.apache.polaris.core.connection.AuthenticationParametersDpo; -import org.apache.polaris.core.connection.AuthenticationType; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; @@ -213,7 +211,7 @@ protected void initializeCatalog() { .log("Initializing federated catalog"); FeatureConfiguration.enforceFeatureEnabledOrThrow( callContext, FeatureConfiguration.ENABLE_CATALOG_FEDERATION); - + Catalog federatedCatalog; ConnectionType connectionType = ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); @@ -232,7 +230,9 @@ protected void initializeCatalog() { connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); break; case HADOOP: - federatedCatalog = HadoopFederatedCatalogFactory.createHadoopCatalog(connectionConfigInfoDpo, getUserSecretsManager()); + federatedCatalog = + HadoopFederatedCatalogFactory.createHadoopCatalog( + connectionConfigInfoDpo, getUserSecretsManager()); break; default: throw new UnsupportedOperationException("Unsupported connection type: " + connectionType); From e036e9fce3356091e1a2775d77f4913316ae6dbf Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Fri, 8 Aug 2025 15:41:59 -0700 Subject: [PATCH 05/16] Modified the polaris-extensions-federation-hadoop dependency to by runtimOnly --- extensions/federation/hadoop/build.gradle.kts | 4 ++ .../hadoop/HadoopFederatedCatalogFactory.java | 19 ++++---- .../core/catalog/NonRestCatalogFactory.java | 43 +++++++++++++++++++ runtime/service/build.gradle.kts | 2 +- .../iceberg/IcebergCatalogAdapter.java | 11 ++++- .../iceberg/IcebergCatalogHandler.java | 22 +++++++--- .../IcebergCatalogHandlerAuthzTest.java | 19 ++++++-- .../apache/polaris/service/TestServices.java | 9 +++- 8 files changed, 107 insertions(+), 22 deletions(-) create mode 100644 polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRestCatalogFactory.java diff --git a/extensions/federation/hadoop/build.gradle.kts b/extensions/federation/hadoop/build.gradle.kts index af251cf260..9e06917ce2 100644 --- a/extensions/federation/hadoop/build.gradle.kts +++ b/extensions/federation/hadoop/build.gradle.kts @@ -48,6 +48,10 @@ dependencies { implementation(libs.hadoop.client.api) implementation(libs.hadoop.client.runtime) + // CDI dependencies for runtime discovery + implementation(libs.jakarta.enterprise.cdi.api) + implementation(libs.smallrye.common.annotation) + // Logging implementation(libs.slf4j.api) } diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java index 438f837173..ae5bba341a 100644 --- a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -18,9 +18,12 @@ */ package org.apache.polaris.extensions.federation.hadoop; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.polaris.core.catalog.NonRESTCatalogFactory; import org.apache.polaris.core.connection.AuthenticationParametersDpo; import org.apache.polaris.core.connection.AuthenticationType; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; @@ -29,18 +32,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Factory class for creating federated hadoop catalogs based on connection configuration. */ -public class HadoopFederatedCatalogFactory { +/** Factory class for creating a Hadoop catalog handle based on connection configuration. */ +@ApplicationScoped +@Identifier("hadoop") +public class HadoopFederatedCatalogFactory implements NonRESTCatalogFactory { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); - /** - * Creates a federated catalog based on the provided connection configuration. - * - * @param connectionConfigInfoDpo The connection configuration - * @param userSecretsManager The user secrets manager for handling credentials - * @return The initialized hadoop catalog - */ - public static Catalog createHadoopCatalog( + @Override + public Catalog createCatalog( ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { // Currently, Polaris supports Hadoop federation only via IMPLICIT authentication. // Hence, prior to initializing the configuration, ensure that the catalog uses diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRestCatalogFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRestCatalogFactory.java new file mode 100644 index 0000000000..0607507479 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRestCatalogFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.catalog; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; + +/** + * Factory interface for creating non-REST catalog handles based on connection configuration. + * + *

Implementations should be annotated with CDI annotations and use the @Identifier annotation to + * specify which connection type they support. + */ +public interface NonRESTCatalogFactory { + + /** + * Creates a catalog hadnle for the given non-REST connection configuration. + * + * @param connectionConfig the connection configuration + * @param userSecretsManager the user secrets manager for handling credentials + * @return the initialized catalog + * @throws IllegalStateException if the connection configuration is invalid + */ + Catalog createCatalog( + ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager); +} diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 1dc0c8eb47..8400b4b52d 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,7 +30,7 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) - implementation(project(":polaris-extensions-federation-hadoop")) + runtimeOnly(project(":polaris-extensions-federation-hadoop")) runtimeOnly(project(":polaris-relational-jdbc")) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 99aea64d9b..1ea1358011 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import jakarta.enterprise.context.RequestScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.HttpHeaders; @@ -61,6 +63,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.NonRESTCatalogFactory; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; @@ -144,6 +147,7 @@ public class IcebergCatalogAdapter private final CatalogPrefixParser prefixParser; private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; + private final Instance nonRESTCatalogFactories; @Inject public IcebergCatalogAdapter( @@ -157,7 +161,8 @@ public IcebergCatalogAdapter( PolarisAuthorizer polarisAuthorizer, CatalogPrefixParser prefixParser, ReservedProperties reservedProperties, - CatalogHandlerUtils catalogHandlerUtils) { + CatalogHandlerUtils catalogHandlerUtils, + @Any Instance nonRESTCatalogFactories) { this.realmContext = realmContext; this.callContext = callContext; this.catalogFactory = catalogFactory; @@ -169,6 +174,7 @@ public IcebergCatalogAdapter( this.prefixParser = prefixParser; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; + this.nonRESTCatalogFactories = nonRESTCatalogFactories; } /** @@ -205,7 +211,8 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String catalogName, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + nonRESTCatalogFactories); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index a699d5d2bf..dbbe4165f5 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -20,7 +20,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import io.smallrye.common.annotation.Identifier; import jakarta.annotation.Nonnull; +import jakarta.enterprise.inject.Instance; import jakarta.ws.rs.core.SecurityContext; import java.io.Closeable; import java.time.OffsetDateTime; @@ -74,6 +76,7 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.NonRESTCatalogFactory; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; import org.apache.polaris.core.connection.ConnectionType; @@ -93,7 +96,6 @@ import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; -import org.apache.polaris.extensions.federation.hadoop.HadoopFederatedCatalogFactory; import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.common.CatalogHandler; import org.apache.polaris.service.config.ReservedProperties; @@ -128,6 +130,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; + private final Instance nonRESTCatalogFactories; + // Catalog instance will be initialized after authorizing resolver successfully resolves // the catalog entity. protected Catalog baseCatalog = null; @@ -147,13 +151,15 @@ public IcebergCatalogHandler( String catalogName, PolarisAuthorizer authorizer, ReservedProperties reservedProperties, - CatalogHandlerUtils catalogHandlerUtils) { + CatalogHandlerUtils catalogHandlerUtils, + Instance nonRESTCatalogFactories) { super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer); this.metaStoreManager = metaStoreManager; this.userSecretsManager = userSecretsManager; this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; + this.nonRESTCatalogFactories = nonRESTCatalogFactories; } /** @@ -230,9 +236,15 @@ protected void initializeCatalog() { connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); break; case HADOOP: - federatedCatalog = - HadoopFederatedCatalogFactory.createHadoopCatalog( - connectionConfigInfoDpo, getUserSecretsManager()); + // Use CDI to select the Hadoop federation factory at runtime + Instance hadoopFactory = + nonRESTCatalogFactories.select(Identifier.Literal.of("hadoop")); + if (!hadoopFactory.isUnsatisfied()) { + federatedCatalog = + hadoopFactory.get().createCatalog(connectionConfigInfoDpo, getUserSecretsManager()); + } else { + throw new UnsupportedOperationException("Hadoop federation factory unavailable."); + } break; default: throw new UnsupportedOperationException("Unsupported connection type: " + connectionType); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java index ea963784b3..bb58b3fbe0 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; +import jakarta.enterprise.inject.Instance; import jakarta.ws.rs.core.SecurityContext; import java.time.Instant; import java.util.List; @@ -55,6 +56,7 @@ import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials; import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; +import org.apache.polaris.core.catalog.NonRESTCatalogFactory; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.CatalogRoleEntity; @@ -79,6 +81,14 @@ @TestProfile(PolarisAuthzTestBase.Profile.class) public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { + @SuppressWarnings("unchecked") + private static Instance emptyNonRESTCatalogFactory() { + Instance mock = Mockito.mock(Instance.class); + Mockito.when(mock.select(Mockito.any())).thenReturn(mock); + Mockito.when(mock.isUnsatisfied()).thenReturn(true); + return mock; + } + private IcebergCatalogHandler newWrapper() { return newWrapper(Set.of()); } @@ -101,7 +111,8 @@ private IcebergCatalogHandler newWrapper( catalogName, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + emptyNonRESTCatalogFactory()); } /** @@ -242,7 +253,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { CATALOG_NAME, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + emptyNonRESTCatalogFactory()); // a variety of actions are all disallowed because the principal's credentials must be rotated doTestInsufficientPrivileges( @@ -277,7 +289,8 @@ public void testInsufficientPermissionsPriorToSecretRotation() { CATALOG_NAME, polarisAuthorizer, reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + emptyNonRESTCatalogFactory()); doTestSufficientPrivilegeSets( List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)), diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index d8ec777889..493049ed2c 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -22,6 +22,7 @@ import com.google.auth.oauth2.GoogleCredentials; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import jakarta.enterprise.inject.Instance; import jakarta.ws.rs.core.SecurityContext; import java.security.Principal; import java.time.Clock; @@ -36,6 +37,7 @@ import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; +import org.apache.polaris.core.catalog.NonRESTCatalogFactory; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; @@ -215,6 +217,10 @@ public TestServices build() { CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(callContext.getRealmConfig()); + Instance nonRESTCatalogFactory = Mockito.mock(Instance.class); + Mockito.when(nonRESTCatalogFactory.select(Mockito.any())).thenReturn(nonRESTCatalogFactory); + Mockito.when(nonRESTCatalogFactory.isUnsatisfied()).thenReturn(true); + IcebergCatalogAdapter catalogService = new IcebergCatalogAdapter( realmContext, @@ -227,7 +233,8 @@ public TestServices build() { authorizer, new DefaultCatalogPrefixParser(), reservedProperties, - catalogHandlerUtils); + catalogHandlerUtils, + nonRESTCatalogFactory); IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(catalogService); IcebergRestConfigurationApi restConfigurationApi = From 2c4aa584653edccc53b51c4c8fb3d496fe4b42bc Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Fri, 8 Aug 2025 16:23:15 -0700 Subject: [PATCH 06/16] Rename --- .../{NonRestCatalogFactory.java => NonRESTCatalogFactory.java} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename polaris-core/src/main/java/org/apache/polaris/core/catalog/{NonRestCatalogFactory.java => NonRESTCatalogFactory.java} (100%) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRestCatalogFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRESTCatalogFactory.java similarity index 100% rename from polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRestCatalogFactory.java rename to polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRESTCatalogFactory.java From c73966cda2d73396aeda70738b1b7133a545ba24 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Fri, 8 Aug 2025 17:02:13 -0700 Subject: [PATCH 07/16] Make the runtime dependency flag dependent --- runtime/service/build.gradle.kts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 8400b4b52d..dbaf602196 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,7 +30,12 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) - runtimeOnly(project(":polaris-extensions-federation-hadoop")) + if ( + (project.findProperty("NonRESTCatalogs") as String?) + ?.contains("HADOOP") == true + ) { + runtimeOnly(project(":polaris-extensions-federation-hadoop")) + } runtimeOnly(project(":polaris-relational-jdbc")) From 284ad407a1a8b2b7c3a23d2565f065f8d76eeac7 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Fri, 8 Aug 2025 17:13:28 -0700 Subject: [PATCH 08/16] spotlessApply --- runtime/service/build.gradle.kts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index dbaf602196..d3c93ffb9b 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,10 +30,7 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) - if ( - (project.findProperty("NonRESTCatalogs") as String?) - ?.contains("HADOOP") == true - ) { + if ((project.findProperty("NonRESTCatalogs") as String?)?.contains("HADOOP") == true) { runtimeOnly(project(":polaris-extensions-federation-hadoop")) } From 790f81bb4c7718ee6e3b23adcfd0627bf912f2a9 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Mon, 11 Aug 2025 13:56:23 -0700 Subject: [PATCH 09/16] Address review comments --- .../hadoop/HadoopFederatedCatalogFactory.java | 4 +- ...ctory.java => ExternalCatalogFactory.java} | 6 +- .../iceberg/IcebergCatalogAdapter.java | 10 +-- .../iceberg/IcebergCatalogHandler.java | 49 ++++++-------- .../IcebergRESTExternalCatalogFactory.java | 64 +++++++++++++++++++ .../IcebergCatalogHandlerAuthzTest.java | 12 ++-- .../apache/polaris/service/TestServices.java | 11 ++-- 7 files changed, 107 insertions(+), 49 deletions(-) rename polaris-core/src/main/java/org/apache/polaris/core/catalog/{NonRESTCatalogFactory.java => ExternalCatalogFactory.java} (89%) create mode 100644 runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java index ae5bba341a..25089641a6 100644 --- a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; -import org.apache.polaris.core.catalog.NonRESTCatalogFactory; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.connection.AuthenticationParametersDpo; import org.apache.polaris.core.connection.AuthenticationType; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; @@ -35,7 +35,7 @@ /** Factory class for creating a Hadoop catalog handle based on connection configuration. */ @ApplicationScoped @Identifier("hadoop") -public class HadoopFederatedCatalogFactory implements NonRESTCatalogFactory { +public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); @Override diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRESTCatalogFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java similarity index 89% rename from polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRESTCatalogFactory.java rename to polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java index 0607507479..59c8903753 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/NonRESTCatalogFactory.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java @@ -23,15 +23,15 @@ import org.apache.polaris.core.secrets.UserSecretsManager; /** - * Factory interface for creating non-REST catalog handles based on connection configuration. + * Factory interface for creating external catalog handles based on connection configuration. * *

Implementations should be annotated with CDI annotations and use the @Identifier annotation to * specify which connection type they support. */ -public interface NonRESTCatalogFactory { +public interface ExternalCatalogFactory { /** - * Creates a catalog hadnle for the given non-REST connection configuration. + * Creates a catalog handle for the given connection configuration. * * @param connectionConfig the connection configuration * @param userSecretsManager the user secrets manager for handling credentials diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 1ea1358011..ac079e71d9 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -63,7 +63,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; -import org.apache.polaris.core.catalog.NonRESTCatalogFactory; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisEntity; @@ -147,7 +147,7 @@ public class IcebergCatalogAdapter private final CatalogPrefixParser prefixParser; private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; - private final Instance nonRESTCatalogFactories; + private final Instance externalCatalogFactories; @Inject public IcebergCatalogAdapter( @@ -162,7 +162,7 @@ public IcebergCatalogAdapter( CatalogPrefixParser prefixParser, ReservedProperties reservedProperties, CatalogHandlerUtils catalogHandlerUtils, - @Any Instance nonRESTCatalogFactories) { + @Any Instance externalCatalogFactories) { this.realmContext = realmContext; this.callContext = callContext; this.catalogFactory = catalogFactory; @@ -174,7 +174,7 @@ public IcebergCatalogAdapter( this.prefixParser = prefixParser; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; - this.nonRESTCatalogFactories = nonRESTCatalogFactories; + this.externalCatalogFactories = externalCatalogFactories; } /** @@ -212,7 +212,7 @@ IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String polarisAuthorizer, reservedProperties, catalogHandlerUtils, - nonRESTCatalogFactories); + externalCatalogFactories); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index dbbe4165f5..6e659c2df7 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -47,7 +47,6 @@ import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; @@ -56,8 +55,6 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.rest.HTTPClient; -import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.credentials.ImmutableCredential; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; @@ -76,11 +73,10 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.polaris.core.auth.PolarisAuthorizableOperation; import org.apache.polaris.core.auth.PolarisAuthorizer; -import org.apache.polaris.core.catalog.NonRESTCatalogFactory; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; import org.apache.polaris.core.connection.ConnectionType; -import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; @@ -130,7 +126,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab private final ReservedProperties reservedProperties; private final CatalogHandlerUtils catalogHandlerUtils; - private final Instance nonRESTCatalogFactories; + private final Instance externalCatalogFactories; // Catalog instance will be initialized after authorizing resolver successfully resolves // the catalog entity. @@ -152,14 +148,14 @@ public IcebergCatalogHandler( PolarisAuthorizer authorizer, ReservedProperties reservedProperties, CatalogHandlerUtils catalogHandlerUtils, - Instance nonRESTCatalogFactories) { + Instance externalCatalogFactories) { super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer); this.metaStoreManager = metaStoreManager; this.userSecretsManager = userSecretsManager; this.catalogFactory = catalogFactory; this.reservedProperties = reservedProperties; this.catalogHandlerUtils = catalogHandlerUtils; - this.nonRESTCatalogFactories = nonRESTCatalogFactories; + this.externalCatalogFactories = externalCatalogFactories; } /** @@ -221,34 +217,31 @@ protected void initializeCatalog() { Catalog federatedCatalog; ConnectionType connectionType = ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); + + // Use the unified factory pattern for all external catalog types + String factoryIdentifier; switch (connectionType) { case ICEBERG_REST: - SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); - federatedCatalog = - new RESTCatalog( - context, - (config) -> - HTTPClient.builder(config) - .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) - .build()); - federatedCatalog.initialize( - ((IcebergRestConnectionConfigInfoDpo) connectionConfigInfoDpo).getRemoteCatalogName(), - connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager())); + factoryIdentifier = "iceberg-rest"; break; case HADOOP: - // Use CDI to select the Hadoop federation factory at runtime - Instance hadoopFactory = - nonRESTCatalogFactories.select(Identifier.Literal.of("hadoop")); - if (!hadoopFactory.isUnsatisfied()) { - federatedCatalog = - hadoopFactory.get().createCatalog(connectionConfigInfoDpo, getUserSecretsManager()); - } else { - throw new UnsupportedOperationException("Hadoop federation factory unavailable."); - } + factoryIdentifier = "hadoop"; break; default: throw new UnsupportedOperationException("Unsupported connection type: " + connectionType); } + + Instance externalCatalogFactory = + externalCatalogFactories.select(Identifier.Literal.of(factoryIdentifier)); + if (!externalCatalogFactory.isUnsatisfied()) { + federatedCatalog = + externalCatalogFactory + .get() + .createCatalog(connectionConfigInfoDpo, getUserSecretsManager()); + } else { + throw new UnsupportedOperationException( + "External catalog factory for type '" + connectionType + "' is unavailable."); + } this.baseCatalog = federatedCatalog; } else { LOGGER.atInfo().log("Initializing non-federated catalog"); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java new file mode 100644 index 0000000000..57cfcf8ae6 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.iceberg; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; + +/** Factory class for creating an Iceberg REST catalog handle based on connection configuration. */ +@ApplicationScoped +@Identifier("iceberg-rest") +public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory { + + @Override + public Catalog createCatalog( + ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) { + if (!(connectionConfig instanceof IcebergRestConnectionConfigInfoDpo)) { + throw new IllegalArgumentException( + "Expected IcebergRestConnectionConfigInfoDpo but got: " + + connectionConfig.getClass().getSimpleName()); + } + + IcebergRestConnectionConfigInfoDpo icebergConfig = + (IcebergRestConnectionConfigInfoDpo) connectionConfig; + + SessionCatalog.SessionContext context = SessionCatalog.SessionContext.createEmpty(); + RESTCatalog federatedCatalog = + new RESTCatalog( + context, + (config) -> + HTTPClient.builder(config) + .uri(config.get(org.apache.iceberg.CatalogProperties.URI)) + .build()); + + federatedCatalog.initialize( + icebergConfig.getRemoteCatalogName(), + connectionConfig.asIcebergCatalogProperties(userSecretsManager)); + + return federatedCatalog; + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java index bb58b3fbe0..90d7341a95 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java @@ -56,7 +56,7 @@ import org.apache.polaris.core.admin.model.PrincipalWithCredentialsCredentials; import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; -import org.apache.polaris.core.catalog.NonRESTCatalogFactory; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.CatalogRoleEntity; @@ -82,8 +82,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { @SuppressWarnings("unchecked") - private static Instance emptyNonRESTCatalogFactory() { - Instance mock = Mockito.mock(Instance.class); + private static Instance emptyExternalCatalogFactory() { + Instance mock = Mockito.mock(Instance.class); Mockito.when(mock.select(Mockito.any())).thenReturn(mock); Mockito.when(mock.isUnsatisfied()).thenReturn(true); return mock; @@ -112,7 +112,7 @@ private IcebergCatalogHandler newWrapper( polarisAuthorizer, reservedProperties, catalogHandlerUtils, - emptyNonRESTCatalogFactory()); + emptyExternalCatalogFactory()); } /** @@ -254,7 +254,7 @@ public void testInsufficientPermissionsPriorToSecretRotation() { polarisAuthorizer, reservedProperties, catalogHandlerUtils, - emptyNonRESTCatalogFactory()); + emptyExternalCatalogFactory()); // a variety of actions are all disallowed because the principal's credentials must be rotated doTestInsufficientPrivileges( @@ -290,7 +290,7 @@ public void testInsufficientPermissionsPriorToSecretRotation() { polarisAuthorizer, reservedProperties, catalogHandlerUtils, - emptyNonRESTCatalogFactory()); + emptyExternalCatalogFactory()); doTestSufficientPrivilegeSets( List.of(Set.of(PolarisPrivilege.NAMESPACE_LIST)), diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 493049ed2c..e7288c441e 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -37,7 +37,7 @@ import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; import org.apache.polaris.core.auth.PolarisAuthorizer; -import org.apache.polaris.core.catalog.NonRESTCatalogFactory; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; @@ -217,9 +217,10 @@ public TestServices build() { CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(callContext.getRealmConfig()); - Instance nonRESTCatalogFactory = Mockito.mock(Instance.class); - Mockito.when(nonRESTCatalogFactory.select(Mockito.any())).thenReturn(nonRESTCatalogFactory); - Mockito.when(nonRESTCatalogFactory.isUnsatisfied()).thenReturn(true); + @SuppressWarnings("unchecked") + Instance externalCatalogFactory = Mockito.mock(Instance.class); + Mockito.when(externalCatalogFactory.select(Mockito.any())).thenReturn(externalCatalogFactory); + Mockito.when(externalCatalogFactory.isUnsatisfied()).thenReturn(true); IcebergCatalogAdapter catalogService = new IcebergCatalogAdapter( @@ -234,7 +235,7 @@ public TestServices build() { new DefaultCatalogPrefixParser(), reservedProperties, catalogHandlerUtils, - nonRESTCatalogFactory); + externalCatalogFactory); IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(catalogService); IcebergRestConfigurationApi restConfigurationApi = From 05dbe0c3afcae01644c44381d73d1326532b067e Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 11:11:25 -0700 Subject: [PATCH 10/16] Address review comments --- extensions/federation/hadoop/build.gradle.kts | 6 ------ .../hadoop/HadoopFederatedCatalogFactory.java | 3 ++- .../core/connection/ConnectionType.java | 21 +++++++++++++++++++ .../iceberg/IcebergCatalogHandler.java | 4 +--- .../IcebergRESTExternalCatalogFactory.java | 3 ++- 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/extensions/federation/hadoop/build.gradle.kts b/extensions/federation/hadoop/build.gradle.kts index 9e06917ce2..431da94e52 100644 --- a/extensions/federation/hadoop/build.gradle.kts +++ b/extensions/federation/hadoop/build.gradle.kts @@ -55,9 +55,3 @@ dependencies { // Logging implementation(libs.slf4j.api) } - -java { toolchain { languageVersion.set(JavaLanguageVersion.of(21)) } } - -tasks.withType { options.encoding = "UTF-8" } - -tasks.test { useJUnitPlatform() } diff --git a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java index 25089641a6..50294da99c 100644 --- a/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java +++ b/extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java @@ -27,6 +27,7 @@ import org.apache.polaris.core.connection.AuthenticationParametersDpo; import org.apache.polaris.core.connection.AuthenticationType; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; import org.apache.polaris.core.secrets.UserSecretsManager; import org.slf4j.Logger; @@ -34,7 +35,7 @@ /** Factory class for creating a Hadoop catalog handle based on connection configuration. */ @ApplicationScoped -@Identifier("hadoop") +@Identifier(ConnectionType.HADOOP_FACTORY_IDENTIFIER) public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFederatedCatalogFactory.class); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java index 441c0c4c53..2e09366a31 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java @@ -35,6 +35,9 @@ public enum ConnectionType { HADOOP(2), ; + public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "ICEBERG_REST"; + public static final String HADOOP_FACTORY_IDENTIFIER = "HADOOP"; + private static final ConnectionType[] REVERSE_MAPPING_ARRAY; static { @@ -77,4 +80,22 @@ public enum ConnectionType { public int getCode() { return this.code; } + + /** + * Get the factory identifier string used for CDI injection of the appropriate + * ExternalCatalogFactory. + * + * @return the factory identifier string + */ + public String getFactoryIdentifier() { + switch (this) { + case ICEBERG_REST: + return ICEBERG_REST_FACTORY_IDENTIFIER; + case HADOOP: + return HADOOP_FACTORY_IDENTIFIER; + default: + throw new UnsupportedOperationException( + "No factory identifier for connection type: " + this); + } + } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 6e659c2df7..abd57e4362 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -222,10 +222,8 @@ protected void initializeCatalog() { String factoryIdentifier; switch (connectionType) { case ICEBERG_REST: - factoryIdentifier = "iceberg-rest"; - break; case HADOOP: - factoryIdentifier = "hadoop"; + factoryIdentifier = connectionType.getFactoryIdentifier(); break; default: throw new UnsupportedOperationException("Unsupported connection type: " + connectionType); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java index 57cfcf8ae6..05de201c37 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRESTExternalCatalogFactory.java @@ -26,12 +26,13 @@ import org.apache.iceberg.rest.RESTCatalog; import org.apache.polaris.core.catalog.ExternalCatalogFactory; import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.secrets.UserSecretsManager; /** Factory class for creating an Iceberg REST catalog handle based on connection configuration. */ @ApplicationScoped -@Identifier("iceberg-rest") +@Identifier(ConnectionType.ICEBERG_REST_FACTORY_IDENTIFIER) public class IcebergRESTExternalCatalogFactory implements ExternalCatalogFactory { @Override From 6e78c72b04a75ab784d37fd52280c9183a7d093e Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 13:19:05 -0700 Subject: [PATCH 11/16] Remove the switch statement --- .../catalog/iceberg/IcebergCatalogHandler.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index abd57e4362..af0c4dcfb3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -219,18 +219,9 @@ protected void initializeCatalog() { ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode()); // Use the unified factory pattern for all external catalog types - String factoryIdentifier; - switch (connectionType) { - case ICEBERG_REST: - case HADOOP: - factoryIdentifier = connectionType.getFactoryIdentifier(); - break; - default: - throw new UnsupportedOperationException("Unsupported connection type: " + connectionType); - } - Instance externalCatalogFactory = - externalCatalogFactories.select(Identifier.Literal.of(factoryIdentifier)); + externalCatalogFactories.select( + Identifier.Literal.of(connectionType.getFactoryIdentifier())); if (!externalCatalogFactory.isUnsatisfied()) { federatedCatalog = externalCatalogFactory From 7c7e257673183e869f0ddc7449892665a919205c Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 14:04:59 -0700 Subject: [PATCH 12/16] Add a README --- extensions/federation/README.md | 9 +++++++++ runtime/server/build.gradle.kts | 4 ++++ runtime/service/build.gradle.kts | 3 --- 3 files changed, 13 insertions(+), 3 deletions(-) create mode 100644 extensions/federation/README.md diff --git a/extensions/federation/README.md b/extensions/federation/README.md new file mode 100644 index 0000000000..94e9f2d255 --- /dev/null +++ b/extensions/federation/README.md @@ -0,0 +1,9 @@ +### Using the `HadoopFederatedCatalogFactory` + +This module is an independent compilation unit and will be built into the Polaris binary only when the following flag is set in gradle.properties or as a JVM arg at compile time: + +``` +NonRESTCatalogs=HADOOP, +``` + +Without this flag, the Hadoop factory won't be compiled into Polaris and therefore Polaris will not load the class at runtime, throwing an unsupported exception for federated catalog calls. \ No newline at end of file diff --git a/runtime/server/build.gradle.kts b/runtime/server/build.gradle.kts index 2ebd153812..98d8d36e1c 100644 --- a/runtime/server/build.gradle.kts +++ b/runtime/server/build.gradle.kts @@ -49,6 +49,10 @@ dependencies { runtimeOnly(project(":polaris-relational-jdbc")) runtimeOnly("io.quarkus:quarkus-jdbc-postgresql") + if ((project.findProperty("NonRESTCatalogs") as String?)?.contains("HADOOP") == true) { + runtimeOnly(project(":polaris-extensions-federation-hadoop")) + } + // enforce the Quarkus _platform_ here, to get a consistent and validated set of dependencies implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-container-image-docker") diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index d3c93ffb9b..037776c404 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -30,9 +30,6 @@ dependencies { implementation(project(":polaris-api-management-service")) implementation(project(":polaris-api-iceberg-service")) implementation(project(":polaris-api-catalog-service")) - if ((project.findProperty("NonRESTCatalogs") as String?)?.contains("HADOOP") == true) { - runtimeOnly(project(":polaris-extensions-federation-hadoop")) - } runtimeOnly(project(":polaris-relational-jdbc")) From 4fb796743a5afdd72df7106016049618e6163856 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 14:11:29 -0700 Subject: [PATCH 13/16] Remove double negative in factory resolution check --- .../polaris/service/catalog/iceberg/IcebergCatalogHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index af0c4dcfb3..266ac11e49 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -222,7 +222,7 @@ protected void initializeCatalog() { Instance externalCatalogFactory = externalCatalogFactories.select( Identifier.Literal.of(connectionType.getFactoryIdentifier())); - if (!externalCatalogFactory.isUnsatisfied()) { + if (externalCatalogFactory.isResolvable()) { federatedCatalog = externalCatalogFactory .get() From 1cbafb3ee2eea9ccf5c31a22ca3df8dfc559dde5 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 14:21:10 -0700 Subject: [PATCH 14/16] Update the README --- extensions/federation/README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/extensions/federation/README.md b/extensions/federation/README.md index 94e9f2d255..209ad1a891 100644 --- a/extensions/federation/README.md +++ b/extensions/federation/README.md @@ -1,9 +1,13 @@ ### Using the `HadoopFederatedCatalogFactory` -This module is an independent compilation unit and will be built into the Polaris binary only when the following flag is set in gradle.properties or as a JVM arg at compile time: - +This `HadoopFederatedCatalogFactory` module is an independent compilation unit and will be built into the Polaris binary only when the following flag is set in the gradle.properties file: ``` NonRESTCatalogs=HADOOP, ``` +The other option is to pass it as an argument to the gradle JVM as follows: +``` +./gradlew build DNonRESTCatalogs=HADOOP +``` + Without this flag, the Hadoop factory won't be compiled into Polaris and therefore Polaris will not load the class at runtime, throwing an unsupported exception for federated catalog calls. \ No newline at end of file From 1902d2173d81e83613051a4789c8cdbf9f064905 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 14:23:24 -0700 Subject: [PATCH 15/16] Update README.md --- extensions/federation/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/federation/README.md b/extensions/federation/README.md index 209ad1a891..8c7943e770 100644 --- a/extensions/federation/README.md +++ b/extensions/federation/README.md @@ -7,7 +7,7 @@ NonRESTCatalogs=HADOOP, The other option is to pass it as an argument to the gradle JVM as follows: ``` -./gradlew build DNonRESTCatalogs=HADOOP +./gradlew build -DNonRESTCatalogs=HADOOP ``` Without this flag, the Hadoop factory won't be compiled into Polaris and therefore Polaris will not load the class at runtime, throwing an unsupported exception for federated catalog calls. \ No newline at end of file From 103fe6fe1ab851008d28fc97b193d41d4f9c8724 Mon Sep 17 00:00:00 2001 From: Pooja Nilangekar Date: Tue, 12 Aug 2025 15:23:12 -0700 Subject: [PATCH 16/16] Add license to README --- extensions/federation/README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/extensions/federation/README.md b/extensions/federation/README.md index 8c7943e770..bd4c8d6928 100644 --- a/extensions/federation/README.md +++ b/extensions/federation/README.md @@ -1,3 +1,21 @@ + ### Using the `HadoopFederatedCatalogFactory` This `HadoopFederatedCatalogFactory` module is an independent compilation unit and will be built into the Polaris binary only when the following flag is set in the gradle.properties file: