Skip to content

Commit 62da19c

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-6531] Fail to instantiate the custom v2 session catalog: DelegatingCatalog (#1221)
* [CARMEL-6531] Fail to instantiate the custom v2 session catalog: DelegatingCatalog Cherry-pick from: https://github.corp.ebay.com/hadoop/spark-longwing3/pull/194 https://github.corp.ebay.com/hadoop/spark-longwing3/pull/203 * Fix ut
1 parent 91bc3d5 commit 62da19c

File tree

2 files changed

+102
-12
lines changed

2 files changed

+102
-12
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalog.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.connector.catalog;
1919

2020
import com.google.common.collect.Maps;
21+
import org.apache.spark.SparkException;
2122
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
2223
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
2324
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -46,6 +47,8 @@
4647
* --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.connector.catalog.DelegatingCatalog
4748
* --conf spark.sql.catalog.spark_catalog.delegating.provider.delta=DeltaCatalog
4849
* --conf spark.sql.catalog.DeltaCatalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
50+
* and it is default to ignore catalog loading exception, which can be disabled by :
51+
* --conf spark.sql.catalog.spark_catalog.catch_catalog_load_exception=false
4952
*/
5053
public class DelegatingCatalog implements CatalogExtension, StagingTableCatalog {
5154

@@ -56,6 +59,8 @@ public class DelegatingCatalog implements CatalogExtension, StagingTableCatalog
5659
private Map<String, String> catalogs = Maps.newHashMap();
5760
private CatalogManager catalogManager = null;
5861
private SupportMetadataTable metadataTableCatalog = null;
62+
private boolean catchCatalogLoadException;
63+
public static final String CATCH_CATALOG_LOAD_EXCEPTION = "catch_catalog_load_exception";
5964
private static final String ICEBERG_PROVIDER = "iceberg";
6065
private static final String HIVE_PROVIDER = "hive";
6166

@@ -103,6 +108,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {
103108
catalogs.put(provider, catalogName);
104109
}
105110
}
111+
catchCatalogLoadException = options.getBoolean(CATCH_CATALOG_LOAD_EXCEPTION, true);
106112
}
107113

108114
@Override
@@ -119,7 +125,7 @@ public void setCatalogManager(CatalogManager manager) {
119125
catalogManager = manager;
120126
CatalogPlugin sessionCatalog = asTableCatalog();
121127
for (String catalogName : catalogs.values()) {
122-
CatalogPlugin catalog = catalogManager.catalog(catalogName);
128+
CatalogPlugin catalog = tryLoadCatalog(catalogName);
123129
if (catalog instanceof CatalogExtension) {
124130
((CatalogExtension) catalog).setDelegateCatalog(sessionCatalog);
125131
}
@@ -138,14 +144,30 @@ private CatalogExtension getV2Catalog(String provider) {
138144
if (catalogName == null) {
139145
return null;
140146
}
141-
CatalogPlugin catalog = catalogManager.catalog(catalogName);
142-
if (catalog instanceof CatalogExtension) {
147+
CatalogPlugin catalog = tryLoadCatalog(catalogName);
148+
if (catalog == null) {
149+
return null;
150+
} else if (catalog instanceof CatalogExtension) {
143151
return (CatalogExtension) catalog;
144152
} else {
145153
throw new RuntimeException(catalog.name() + " is not a CatalogExtension");
146154
}
147155
}
148156

157+
private CatalogPlugin tryLoadCatalog(String name) {
158+
CatalogPlugin catalog = null;
159+
try {
160+
catalog = catalogManager.catalog(name);
161+
} catch (Exception e) {
162+
if ((e instanceof SparkException) && catchCatalogLoadException) {
163+
logger.warn("Exception caught when init loading catalog " + name);
164+
} else {
165+
throw e;
166+
}
167+
}
168+
return catalog;
169+
}
170+
149171
private StagingTableCatalog getStagingTableCatalog(String provider) {
150172
CatalogExtension v2Catalog = getV2Catalog(provider);
151173
if (v2Catalog == null) {

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameDelegatingCatalogSuite.scala

Lines changed: 77 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -202,25 +202,40 @@ private [connector] trait DelegatingCatalogTest[T <: Table]
202202

203203
protected val catalogClassName: String = classOf[DelegatingCatalog].getName
204204

205-
private val FAKEV2_PROVIDER_KEY: String = V2_SESSION_CATALOG_IMPLEMENTATION.key +
205+
private val FAKE_V2_PROVIDER_KEY: String = V2_SESSION_CATALOG_IMPLEMENTATION.key +
206206
".delegating.provider." + v2Format
207-
private val Delegated_Catalog_Name: String = classOf[StagingInMemoryTableDelegatedCatalog]
207+
private val DELEGATED_CATALOG_NAME: String = classOf[StagingInMemoryTableDelegatedCatalog]
208208
.getSimpleName
209-
private val Delegated_Catalog: String = s"spark.sql.catalog.$Delegated_Catalog_Name"
210-
private val Delegated_Catalog_Impl: String = classOf[StagingInMemoryTableDelegatedCatalog].getName
209+
private val DELEGATED_CATALOG: String = s"spark.sql.catalog.$DELEGATED_CATALOG_NAME"
210+
private val DELEGATED_CATALOG_IMPL: String = classOf[StagingInMemoryTableDelegatedCatalog].getName
211+
212+
private val NONEXISTENT_FAKE_V2_PROVIDER_KEY: String = V2_SESSION_CATALOG_IMPLEMENTATION.key +
213+
".delegating.provider." + v2Format + "_not_exist"
214+
private val NONEXISTENT_DELEGATED_CATALOG_NAME: String
215+
= classOf[StagingInMemoryTableDelegatedCatalog].getSimpleName + "NotExist"
216+
private val NONEXISTENT_DELEGATED_CATALOG: String
217+
= s"spark.sql.catalog.$NONEXISTENT_DELEGATED_CATALOG_NAME"
218+
private val NONEXISTENT_DELEGATED_CATALOG_IMPL: String
219+
= classOf[StagingInMemoryTableDelegatedCatalog].getName + "NotExist"
211220

212221
before {
213222
spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName)
214-
spark.conf.set(FAKEV2_PROVIDER_KEY, Delegated_Catalog_Name)
215-
spark.conf.set(Delegated_Catalog, Delegated_Catalog_Impl)
223+
spark.conf.set(FAKE_V2_PROVIDER_KEY, DELEGATED_CATALOG_NAME)
224+
spark.conf.set(DELEGATED_CATALOG, DELEGATED_CATALOG_IMPL)
225+
// add catalog configuration that fails to load
226+
spark.conf.set(NONEXISTENT_FAKE_V2_PROVIDER_KEY, NONEXISTENT_DELEGATED_CATALOG_NAME)
227+
spark.conf.set(NONEXISTENT_DELEGATED_CATALOG, NONEXISTENT_DELEGATED_CATALOG_IMPL)
216228
}
217229

218230
override def afterEach(): Unit = {
219231
super.afterEach()
220-
catalog(Delegated_Catalog_Name).asInstanceOf[StagingInMemoryTableDelegatedCatalog].clearTables()
232+
catalog(DELEGATED_CATALOG_NAME).asInstanceOf[StagingInMemoryTableDelegatedCatalog].clearTables()
221233
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
222-
spark.conf.unset(FAKEV2_PROVIDER_KEY)
223-
spark.conf.unset(Delegated_Catalog)
234+
spark.conf.unset(FAKE_V2_PROVIDER_KEY)
235+
spark.conf.unset(DELEGATED_CATALOG)
236+
// unset catalog configuration that fails to load
237+
spark.conf.unset(NONEXISTENT_FAKE_V2_PROVIDER_KEY)
238+
spark.conf.unset(NONEXISTENT_DELEGATED_CATALOG)
224239
}
225240

226241
protected def verifyTable(tableName: String, expected: DataFrame): Unit
@@ -311,3 +326,56 @@ private [connector] trait DelegatingCatalogTest[T <: Table]
311326
verifyTable(t1, Seq(("c", "d")).toDF("id", "data"))
312327
}
313328
}
329+
330+
class DataSourceV2DataFrameDelegatingCatalogFailLoadingSuite
331+
extends QueryTest
332+
with DelegatingCatalogFailLoadingTest[InMemoryTable] {
333+
334+
test("showTables should fail with SparkException that can't load delegated catalog") {
335+
val expectedErrorMsg
336+
= "Cannot find catalog plugin class for catalog '" + NONEXISTENT_DELEGATED_CATALOG_NAME + "':"
337+
val msg = intercept[org.apache.spark.SparkException] {
338+
sql("show tables")
339+
Catalogs.load(NONEXISTENT_DELEGATED_CATALOG_NAME, spark.sessionState.conf)
340+
}.getMessage
341+
assert(msg.contains(expectedErrorMsg))
342+
}
343+
}
344+
345+
private [connector] trait DelegatingCatalogFailLoadingTest[T <: Table]
346+
extends QueryTest
347+
with SharedSparkSession
348+
with BeforeAndAfter {
349+
350+
protected val v2Format: String = classOf[FakeV2Provider].getName
351+
352+
protected val catalogClassName: String = classOf[DelegatingCatalog].getName
353+
354+
private val NONEXISTENT_FAKE_V2_PROVIDER_KEY: String = V2_SESSION_CATALOG_IMPLEMENTATION.key +
355+
".delegating.provider." + v2Format + "_not_exist"
356+
protected val NONEXISTENT_DELEGATED_CATALOG_NAME: String
357+
= classOf[StagingInMemoryTableDelegatedCatalog].getSimpleName + "NotExist"
358+
private val NONEXISTENT_DELEGATED_CATALOG: String
359+
= s"spark.sql.catalog.$NONEXISTENT_DELEGATED_CATALOG_NAME"
360+
private val NONEXISTENT_DELEGATED_CATALOG_IMPL: String
361+
= classOf[StagingInMemoryTableDelegatedCatalog].getName + "NotExist"
362+
363+
private val CATCH_CATALOG_LOAD_EXCEPTION_KEY: String = V2_SESSION_CATALOG_IMPLEMENTATION.key +
364+
'.' + DelegatingCatalog.CATCH_CATALOG_LOAD_EXCEPTION
365+
366+
before {
367+
spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION.key, catalogClassName)
368+
spark.conf.set(CATCH_CATALOG_LOAD_EXCEPTION_KEY, false)
369+
// add catalog configuration that fails to load
370+
spark.conf.set(NONEXISTENT_FAKE_V2_PROVIDER_KEY, NONEXISTENT_DELEGATED_CATALOG_NAME)
371+
spark.conf.set(NONEXISTENT_DELEGATED_CATALOG, NONEXISTENT_DELEGATED_CATALOG_IMPL)
372+
}
373+
374+
override def afterEach(): Unit = {
375+
super.afterEach()
376+
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
377+
// unset catalog configuration that fails to load
378+
spark.conf.unset(NONEXISTENT_FAKE_V2_PROVIDER_KEY)
379+
spark.conf.unset(NONEXISTENT_DELEGATED_CATALOG)
380+
}
381+
}

0 commit comments

Comments
 (0)