Skip to content

Commit e613954

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-7500][CARMEL-5963] CartesianProduct may introduce many concur… (apache#272)
1 parent cfc666b commit e613954

File tree

3 files changed

+93
-1
lines changed

3 files changed

+93
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,6 +1684,15 @@ object SQLConf {
16841684
.booleanConf
16851685
.createWithDefault(true)
16861686

1687+
val CROSS_JOINS_ENABLED_FOR_JDBC_RELATION = buildConf(
1688+
"spark.carmel.sql.crossJoin.forJdbc.enabled")
1689+
.internal()
1690+
.doc("When false, we will throw an error if a query contains a cartesian product with " +
1691+
"jdbc relation in same stage.")
1692+
.version("2.0.0")
1693+
.booleanConf
1694+
.createWithDefault(true)
1695+
16871696
val GROUP_BY_ORDINAL = buildConf("spark.sql.groupByOrdinal")
16881697
.doc("When true, the ordinal numbers in group by clauses are treated as the position " +
16891698
"in the select list. When false, the ordinal numbers are ignored.")
@@ -5566,6 +5575,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
55665575

55675576
def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
55685577

5578+
def crossJoinForJdbcRelationEnabled: Boolean =
5579+
getConf(SQLConf.CROSS_JOINS_ENABLED_FOR_JDBC_RELATION)
5580+
55695581
override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
55705582

55715583
def jsonGeneratorIgnoreNullFields: Boolean = getConf(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS)

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, Predicate, UnsafeRow}
2424
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
2525
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
26-
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
26+
import org.apache.spark.sql.execution.{DataSourceScanExec, ExternalAppendOnlyUnsafeRowArray, LeafExecNode, SparkPlan}
27+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation
28+
import org.apache.spark.sql.execution.exchange.Exchange
2729
import org.apache.spark.sql.execution.metric.SQLMetrics
2830
import org.apache.spark.util.CompletionIterator
2931

@@ -72,6 +74,14 @@ case class CartesianProductExec(
7274
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
7375

7476
protected override def doExecute(): RDD[InternalRow] = {
77+
if (!conf.crossJoinForJdbcRelationEnabled &&
78+
(containsJDBCRelationInSameStage(left) || containsJDBCRelationInSameStage(right))) {
79+
throw new SparkException(
80+
s"""Detected JDBC Relation in CartesianProduct, abort. Refer to
81+
| https://wiki.vip.corp.ebay.com/x/QhCrJw#id-07FAQ-DetectedJDBCRelationinCartesianProduct
82+
| for more details."""
83+
.stripMargin)
84+
}
7585
val numOutputRows = longMetric("numOutputRows")
7686

7787
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
@@ -102,6 +112,15 @@ case class CartesianProductExec(
102112
}
103113
}
104114

115+
private def containsJDBCRelationInSameStage(plan: SparkPlan): Boolean = {
116+
plan match {
117+
case scan: DataSourceScanExec if scan.relation.isInstanceOf[JDBCRelation] => true
118+
case ex: Exchange => false
119+
case _ =>
120+
!plan.isInstanceOf[LeafExecNode] && plan.children.exists(containsJDBCRelationInSameStage)
121+
}
122+
}
123+
105124
override protected def withNewChildrenInternal(
106125
newLeft: SparkPlan, newRight: SparkPlan): CartesianProductExec =
107126
copy(left = newLeft, right = newRight)

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3535
import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
3636
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeTestUtils}
3737
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode, ProjectExec}
38+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
3839
import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
3940
import org.apache.spark.sql.execution.datasources.LogicalRelation
4041
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils}
42+
import org.apache.spark.sql.execution.joins.CartesianProductExec
4143
import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper
4244
import org.apache.spark.sql.internal.SQLConf
4345
import org.apache.spark.sql.sources._
@@ -2057,4 +2059,63 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
20572059
val df = sql("SELECT * FROM composite_name WHERE `last name` = 'smith'")
20582060
assert(df.collect.toSet === Set(Row("smith", 1)))
20592061
}
2062+
2063+
test("Check CartesianProducts For JDBC Relation") {
2064+
withTable("hdfs_foobar") {
2065+
sql(
2066+
"""create table hdfs_foobar using parquet as
2067+
|select * from foobar
2068+
|""".stripMargin)
2069+
2070+
sql(
2071+
"""create or replace temporary view all_foobar as
2072+
|select * from hdfs_foobar
2073+
|union all select * from foobar
2074+
|""".stripMargin)
2075+
2076+
sql(
2077+
"""create or replace temporary view result as
2078+
|(select t_name, count(*) as count from all_foobar c
2079+
|join (select 'mary' as t_name) l
2080+
|on c.name like l.t_name
2081+
|group BY 1)
2082+
|""".stripMargin)
2083+
2084+
Seq(true, false).foreach(crossJoinForJdbcRelationEnabled => {
2085+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
2086+
SQLConf.CROSS_JOINS_ENABLED_FOR_JDBC_RELATION.key ->
2087+
crossJoinForJdbcRelationEnabled.toString) {
2088+
2089+
val df = sql("SELECT * FROM result")
2090+
2091+
// scalastyle:off println
2092+
println(s"optimizedPlan: ${df.queryExecution.optimizedPlan}")
2093+
println(s"executedPlan: ${df.queryExecution.executedPlan}")
2094+
// scalastyle:on println
2095+
2096+
val plan = df.queryExecution.executedPlan match {
2097+
case ae: AdaptiveSparkPlanExec => ae.executedPlan
2098+
case p => p
2099+
}
2100+
2101+
assert(plan.collectFirst({
2102+
case cartesianProduct: CartesianProductExec => cartesianProduct
2103+
}).nonEmpty)
2104+
2105+
assert(plan.collectFirst({
2106+
case jdbc: DataSourceScanExec if jdbc.relation.isInstanceOf[JDBCRelation] => jdbc
2107+
}).nonEmpty)
2108+
2109+
if (crossJoinForJdbcRelationEnabled) {
2110+
df.collect()
2111+
} else {
2112+
val message = intercept[SparkException] {
2113+
df.collect()
2114+
}.getMessage
2115+
assert(message.contains(s"Detected JDBC Relation in CartesianProduct"))
2116+
}
2117+
}
2118+
})
2119+
}
2120+
}
20602121
}

0 commit comments

Comments
 (0)