Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,18 @@
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

abstract class BaseCatalog
implements StagingTableCatalog,
interface BaseCatalog
extends StagingTableCatalog,
ProcedureCatalog,
SupportsNamespaces,
HasIcebergCatalog,
SupportsFunctions {
private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema";
private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true;

private boolean useNullableQuerySchema = USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT;
String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = "use-nullable-query-schema";
boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = true;

@Override
public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
default Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
String[] namespace = ident.namespace();
String name = ident.name();

Expand All @@ -59,7 +58,7 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException
}

@Override
public boolean isFunctionNamespace(String[] namespace) {
default boolean isFunctionNamespace(String[] namespace) {
// Allow for empty namespace, as Spark's storage partitioned joins look up
// the corresponding functions to generate transforms for partitioning
// with an empty namespace, such as `bucket`.
Expand All @@ -68,24 +67,17 @@ public boolean isFunctionNamespace(String[] namespace) {
}

@Override
public boolean isExistingNamespace(String[] namespace) {
default boolean isExistingNamespace(String[] namespace) {
return namespaceExists(namespace);
}

@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.useNullableQuerySchema =
PropertyUtil.propertyAsBoolean(
private boolean useNullableQuerySchema (CaseInsensitiveStringMap options) {
return PropertyUtil.propertyAsBoolean(
options,
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS,
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT);
}

@Override
public boolean useNullableQuerySchema() {
return useNullableQuerySchema;
}

private static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@
*
* <p>
*/
public class SparkCatalog extends BaseCatalog
implements org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView {
public class SparkCatalog
implements BaseCatalog, org.apache.spark.sql.connector.catalog.ViewCatalog, SupportsReplaceView {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
private static final Joiner COMMA_JOINER = Joiner.on(",");
Expand Down Expand Up @@ -726,8 +726,6 @@ public void renameView(Identifier fromIdentifier, Identifier toIdentifier)

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
super.initialize(name, options);

this.cacheEnabled =
PropertyUtil.propertyAsBoolean(
options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.CatalogExtension;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
Expand All @@ -56,13 +57,12 @@
* SupportsNamespaces.
*/
public class SparkSessionCatalog<T extends TableCatalog & FunctionCatalog & SupportsNamespaces>
extends BaseCatalog implements CatalogExtension {
extends DelegatingCatalogExtension implements BaseCatalog {
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};

private String catalogName = null;
private TableCatalog icebergCatalog = null;
private StagingTableCatalog asStagingCatalog = null;
private T sessionCatalog = null;
private boolean createParquetAsIceberg = false;
private boolean createAvroAsIceberg = false;
private boolean createOrcAsIceberg = false;
Expand All @@ -88,57 +88,12 @@ public String[] defaultNamespace() {
return DEFAULT_NAMESPACE;
}

@Override
public String[][] listNamespaces() throws NoSuchNamespaceException {
return getSessionCatalog().listNamespaces();
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
return getSessionCatalog().listNamespaces(namespace);
}

@Override
public boolean namespaceExists(String[] namespace) {
return getSessionCatalog().namespaceExists(namespace);
}

@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
return getSessionCatalog().loadNamespaceMetadata(namespace);
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
getSessionCatalog().createNamespace(namespace, metadata);
}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
getSessionCatalog().alterNamespace(namespace, changes);
}

@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException, NonEmptyNamespaceException {
return getSessionCatalog().dropNamespace(namespace, cascade);
}

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
// delegate to the session catalog because all tables share the same namespace
return getSessionCatalog().listTables(namespace);
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
return icebergCatalog.loadTable(ident);
} catch (NoSuchTableException e) {
return getSessionCatalog().loadTable(ident);
return super.loadTable(ident);
}
}

Expand All @@ -147,7 +102,7 @@ public Table loadTable(Identifier ident, String version) throws NoSuchTableExcep
try {
return icebergCatalog.loadTable(ident, version);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
return getSessionCatalog().loadTable(ident, version);
return super.loadTable(ident, version);
}
}

Expand All @@ -156,7 +111,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep
try {
return icebergCatalog.loadTable(ident, timestamp);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
return getSessionCatalog().loadTable(ident, timestamp);
return super.loadTable(ident, timestamp);
}
}

Expand All @@ -165,7 +120,7 @@ public void invalidateTable(Identifier ident) {
// We do not need to check whether the table exists and whether
// it is an Iceberg table to reduce remote service requests.
icebergCatalog.invalidateTable(ident);
getSessionCatalog().invalidateTable(ident);
super.invalidateTable(ident);
}

@Override
Expand All @@ -177,7 +132,7 @@ public Table createTable(
return icebergCatalog.createTable(ident, schema, partitions, properties);
} else {
// delegate to the session catalog
return getSessionCatalog().createTable(ident, schema, partitions, properties);
return super.createTable(ident, schema, partitions, properties);
}
}

Expand All @@ -193,7 +148,7 @@ public StagedTable stageCreate(
}
catalog = icebergCatalog;
} else {
catalog = getSessionCatalog();
throw new UnsupportedOperationException("Cannot stage a table create on the Session Catalog");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem 1. Cannot do our old staging behavior without rewriting considerably more

}

// create the table with the session catalog, then wrap it in a staged table that will delete to
Expand All @@ -214,7 +169,7 @@ public StagedTable stageReplace(
}
catalog = icebergCatalog;
} else {
catalog = getSessionCatalog();
throw new UnsupportedOperationException("Cannot stage a table replace on the Session Catalog");
}

// attempt to drop the table and fail if it doesn't exist
Expand Down Expand Up @@ -246,7 +201,7 @@ public StagedTable stageCreateOrReplace(
}
catalog = icebergCatalog;
} else {
catalog = getSessionCatalog();
throw new UnsupportedOperationException("Cannot stage a table create or replace on the Session Catalog");
}

// drop the table if it exists
Expand All @@ -269,7 +224,7 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
if (icebergCatalog.tableExists(ident)) {
return icebergCatalog.alterTable(ident, changes);
} else {
return getSessionCatalog().alterTable(ident, changes);
return super.alterTable(ident, changes);
}
}

Expand All @@ -278,15 +233,15 @@ public boolean dropTable(Identifier ident) {
// no need to check table existence to determine which catalog to use. if a table doesn't exist
// then both are
// required to return false.
return icebergCatalog.dropTable(ident) || getSessionCatalog().dropTable(ident);
return icebergCatalog.dropTable(ident) || super.dropTable(ident);
}

@Override
public boolean purgeTable(Identifier ident) {
// no need to check table existence to determine which catalog to use. if a table doesn't exist
// then both are
// required to return false.
return icebergCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident);
return icebergCatalog.purgeTable(ident) || super.purgeTable(ident);
}

@Override
Expand All @@ -298,14 +253,16 @@ public void renameTable(Identifier from, Identifier to)
if (icebergCatalog.tableExists(from)) {
icebergCatalog.renameTable(from, to);
} else {
getSessionCatalog().renameTable(from, to);
super.renameTable(from, to);
}
}

/**
* Removed
*
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem 2. We break all of our Catalog Config code

*
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
super.initialize(name, options);

if (options.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE)
&& options
.get(CatalogUtil.ICEBERG_CATALOG_TYPE)
Expand All @@ -323,6 +280,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg);
this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg);
}
**/

private void validateHmsUri(String catalogHmsUri) {
if (catalogHmsUri == null) {
Expand All @@ -342,18 +300,6 @@ private void validateHmsUri(String catalogHmsUri) {
catalogHmsUri);
}

@Override
@SuppressWarnings("unchecked")
public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
if (sparkSessionCatalog instanceof TableCatalog
&& sparkSessionCatalog instanceof FunctionCatalog
&& sparkSessionCatalog instanceof SupportsNamespaces) {
this.sessionCatalog = (T) sparkSessionCatalog;
} else {
throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog);
}
}

@Override
public String name() {
return catalogName;
Expand All @@ -373,13 +319,6 @@ private boolean useIceberg(String provider) {
return false;
}

private T getSessionCatalog() {
Preconditions.checkNotNull(
sessionCatalog,
"Delegated SessionCatalog is missing. "
+ "Please make sure your are replacing Spark's default catalog, named 'spark_catalog'.");
return sessionCatalog;
}

@Override
public Catalog icebergCatalog() {
Expand All @@ -392,9 +331,9 @@ public Catalog icebergCatalog() {
@Override
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
try {
return super.loadFunction(ident);
return loadFunction(ident);
} catch (NoSuchFunctionException e) {
return getSessionCatalog().loadFunction(ident);
return super.loadFunction(ident);
}
}
}