diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java index 2082c0584608..40fd7a67f12f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java @@ -30,19 +30,18 @@ import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -abstract class BaseCatalog - implements StagingTableCatalog, +interface BaseCatalog + extends StagingTableCatalog, ProcedureCatalog, SupportsNamespaces, HasIcebergCatalog, SupportsFunctions { - private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema"; - private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true; - private boolean useNullableQuerySchema = USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT; + String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema"; + boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true; @Override - public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException { + default Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException { String[] namespace = ident.namespace(); String name = ident.name(); @@ -59,7 +58,7 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException } @Override - public boolean isFunctionNamespace(String[] namespace) { + default boolean isFunctionNamespace(String[] namespace) { // Allow for empty namespace, as Spark's storage partitioned joins look up // the corresponding functions to generate transforms for partitioning // with an empty namespace, such as `bucket`. @@ -68,24 +67,17 @@ public boolean isFunctionNamespace(String[] namespace) { } @Override - public boolean isExistingNamespace(String[] namespace) { + default boolean isExistingNamespace(String[] namespace) { return namespaceExists(namespace); } - @Override - public void initialize(String name, CaseInsensitiveStringMap options) { - this.useNullableQuerySchema = - PropertyUtil.propertyAsBoolean( + private boolean useNullableQuerySchema (CaseInsensitiveStringMap options) { + return PropertyUtil.propertyAsBoolean( options, USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS, USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT); } - @Override - public boolean useNullableQuerySchema() { - return useNullableQuerySchema; - } - private static boolean isSystemNamespace(String[] namespace) { return namespace.length == 1 && namespace[0].equalsIgnoreCase("system"); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 0c361598623e..c97e16a6bf1b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -120,8 +120,8 @@ * *

*/ -public class SparkCatalog extends BaseCatalog - implements org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView { +public class SparkCatalog + implements BaseCatalog, org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); private static final Splitter COMMA = Splitter.on(","); private static final Joiner COMMA_JOINER = Joiner.on(","); @@ -726,8 +726,6 @@ public void renameView(Identifier fromIdentifier, Identifier toIdentifier) @Override public final void initialize(String name, CaseInsensitiveStringMap options) { - super.initialize(name, options); - this.cacheEnabled = PropertyUtil.propertyAsBoolean( options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index fa3f1fbe4b2a..00395dd46a1a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.connector.catalog.CatalogExtension; import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension; import org.apache.spark.sql.connector.catalog.FunctionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; @@ -56,13 +57,12 @@ * SupportsNamespaces. */ public class SparkSessionCatalog - extends BaseCatalog implements CatalogExtension { + extends DelegatingCatalogExtension implements BaseCatalog { private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; private String catalogName = null; private TableCatalog icebergCatalog = null; private StagingTableCatalog asStagingCatalog = null; - private T sessionCatalog = null; private boolean createParquetAsIceberg = false; private boolean createAvroAsIceberg = false; private boolean createOrcAsIceberg = false; @@ -88,57 +88,12 @@ public String[] defaultNamespace() { return DEFAULT_NAMESPACE; } - @Override - public String[][] listNamespaces() throws NoSuchNamespaceException { - return getSessionCatalog().listNamespaces(); - } - - @Override - public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { - return getSessionCatalog().listNamespaces(namespace); - } - - @Override - public boolean namespaceExists(String[] namespace) { - return getSessionCatalog().namespaceExists(namespace); - } - - @Override - public Map loadNamespaceMetadata(String[] namespace) - throws NoSuchNamespaceException { - return getSessionCatalog().loadNamespaceMetadata(namespace); - } - - @Override - public void createNamespace(String[] namespace, Map metadata) - throws NamespaceAlreadyExistsException { - getSessionCatalog().createNamespace(namespace, metadata); - } - - @Override - public void alterNamespace(String[] namespace, NamespaceChange... changes) - throws NoSuchNamespaceException { - getSessionCatalog().alterNamespace(namespace, changes); - } - - @Override - public boolean dropNamespace(String[] namespace, boolean cascade) - throws NoSuchNamespaceException, NonEmptyNamespaceException { - return getSessionCatalog().dropNamespace(namespace, cascade); - } - - @Override - public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { - // delegate to the session catalog because all tables share the same namespace - return getSessionCatalog().listTables(namespace); - } - @Override public Table loadTable(Identifier ident) throws NoSuchTableException { try { return icebergCatalog.loadTable(ident); } catch (NoSuchTableException e) { - return getSessionCatalog().loadTable(ident); + return super.loadTable(ident); } } @@ -147,7 +102,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep try { return icebergCatalog.loadTable(ident, version); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - return getSessionCatalog().loadTable(ident, version); + return super.loadTable(ident, version); } } @@ -156,7 +111,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep try { return icebergCatalog.loadTable(ident, timestamp); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - return getSessionCatalog().loadTable(ident, timestamp); + return super.loadTable(ident, timestamp); } } @@ -165,7 +120,7 @@ public void invalidateTable(Identifier ident) { // We do not need to check whether the table exists and whether // it is an Iceberg table to reduce remote service requests. icebergCatalog.invalidateTable(ident); - getSessionCatalog().invalidateTable(ident); + super.invalidateTable(ident); } @Override @@ -177,7 +132,7 @@ public Table createTable( return icebergCatalog.createTable(ident, schema, partitions, properties); } else { // delegate to the session catalog - return getSessionCatalog().createTable(ident, schema, partitions, properties); + return super.createTable(ident, schema, partitions, properties); } } @@ -193,7 +148,7 @@ public StagedTable stageCreate( } catalog = icebergCatalog; } else { - catalog = getSessionCatalog(); + throw new UnsupportedOperationException("Cannot stage a table create on the Session Catalog"); } // create the table with the session catalog, then wrap it in a staged table that will delete to @@ -214,7 +169,7 @@ public StagedTable stageReplace( } catalog = icebergCatalog; } else { - catalog = getSessionCatalog(); + throw new UnsupportedOperationException("Cannot stage a table replace on the Session Catalog"); } // attempt to drop the table and fail if it doesn't exist @@ -246,7 +201,7 @@ public StagedTable stageCreateOrReplace( } catalog = icebergCatalog; } else { - catalog = getSessionCatalog(); + throw new UnsupportedOperationException("Cannot stage a table create or replace on the Session Catalog"); } // drop the table if it exists @@ -269,7 +224,7 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT if (icebergCatalog.tableExists(ident)) { return icebergCatalog.alterTable(ident, changes); } else { - return getSessionCatalog().alterTable(ident, changes); + return super.alterTable(ident, changes); } } @@ -278,7 +233,7 @@ public boolean dropTable(Identifier ident) { // no need to check table existence to determine which catalog to use. if a table doesn't exist // then both are // required to return false. - return icebergCatalog.dropTable(ident) || getSessionCatalog().dropTable(ident); + return icebergCatalog.dropTable(ident) || super.dropTable(ident); } @Override @@ -286,7 +241,7 @@ public boolean purgeTable(Identifier ident) { // no need to check table existence to determine which catalog to use. if a table doesn't exist // then both are // required to return false. - return icebergCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident); + return icebergCatalog.purgeTable(ident) || super.purgeTable(ident); } @Override @@ -298,14 +253,16 @@ public void renameTable(Identifier from, Identifier to) if (icebergCatalog.tableExists(from)) { icebergCatalog.renameTable(from, to); } else { - getSessionCatalog().renameTable(from, to); + super.renameTable(from, to); } } + /** + * Removed + * + * @Override public final void initialize(String name, CaseInsensitiveStringMap options) { - super.initialize(name, options); - if (options.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE) && options .get(CatalogUtil.ICEBERG_CATALOG_TYPE) @@ -323,6 +280,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg); this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg); } + **/ private void validateHmsUri(String catalogHmsUri) { if (catalogHmsUri == null) { @@ -342,18 +300,6 @@ private void validateHmsUri(String catalogHmsUri) { catalogHmsUri); } - @Override - @SuppressWarnings("unchecked") - public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) { - if (sparkSessionCatalog instanceof TableCatalog - && sparkSessionCatalog instanceof FunctionCatalog - && sparkSessionCatalog instanceof SupportsNamespaces) { - this.sessionCatalog = (T) sparkSessionCatalog; - } else { - throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog); - } - } - @Override public String name() { return catalogName; @@ -373,13 +319,6 @@ private boolean useIceberg(String provider) { return false; } - private T getSessionCatalog() { - Preconditions.checkNotNull( - sessionCatalog, - "Delegated SessionCatalog is missing. " - + "Please make sure your are replacing Spark's default catalog, named 'spark_catalog'."); - return sessionCatalog; - } @Override public Catalog icebergCatalog() { @@ -392,9 +331,9 @@ public Catalog icebergCatalog() { @Override public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { try { - return super.loadFunction(ident); + return loadFunction(ident); } catch (NoSuchFunctionException e) { - return getSessionCatalog().loadFunction(ident); + return super.loadFunction(ident); } } }