Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ store 12783 0.88775510204081632653 6 6
store 11075 0.89743589743589743590 7 7
store 12889 0.95652173913043478261 8 8
store 1939 0.99000000000000000000 9 9
store 12975 1.00000000000000000000 10 10
store 10455 1.00000000000000000000 10 10
store 4333 1.00000000000000000000 10 10
store 12975 1.00000000000000000000 10 10
web 10485 0.48863636363636363636 1 1
web 4483 0.52688172043010752688 2 2
web 8833 0.58241758241758241758 3 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
-- !query schema
struct<c_last_name:string,c_first_name:string,c_salutation:string,c_preferred_cust_flag:string,ss_ticket_number:int,cnt:bigint>
-- !query output
Ransom Thomas Sir N 872 5
Sauer Larry Mr. N 215795 5
Valle Chandra Dr. N 45338 5
Richardson Harry Mr. Y 85055 5
Sauer Larry Mr. N 215795 5
Ransom Thomas Sir N 872 5
114 changes: 82 additions & 32 deletions sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,50 +97,95 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
""".stripMargin)
}

private def runQuery(query: String, goldenFile: File): Unit = {
val (schema, output) = handleExceptions(getNormalizedResult(spark, query))
val queryString = query.trim
val outputString = output.mkString("\n").replaceAll("\\s+$", "")
if (regenerateGoldenFiles) {
val goldenOutput = {
s"-- Automatically generated by ${getClass.getSimpleName}\n\n" +
s"-- !query schema\n" +
schema + "\n" +
s"-- !query output\n" +
outputString +
"\n"
private def runQuery(
query: String,
goldenFile: File,
conf: Seq[(String, String)],
needSort: Boolean): Unit = {
withSQLConf(conf: _*) {
try {
val (schema, output) = handleExceptions(getNormalizedResult(spark, query))
val queryString = query.trim
val outputString = output.mkString("\n").replaceAll("\\s+$", "")
if (regenerateGoldenFiles) {
val goldenOutput = {
s"-- Automatically generated by ${getClass.getSimpleName}\n\n" +
s"-- !query schema\n" +
schema + "\n" +
s"-- !query output\n" +
outputString +
"\n"
}
val parent = goldenFile.getParentFile
if (!parent.exists()) {
assert(parent.mkdirs(), "Could not create directory: " + parent)
}
stringToFile(goldenFile, goldenOutput)
}

// Read back the golden file.
val (expectedSchema, expectedOutput) = {
val goldenOutput = fileToString(goldenFile)
val segments = goldenOutput.split("-- !query.*\n")

// query has 3 segments, plus the header
assert(segments.size == 3,
s"Expected 3 blocks in result file but got ${segments.size}. " +
"Try regenerate the result files.")

(segments(1).trim, segments(2).replaceAll("\\s+$", ""))
}

assertResult(expectedSchema, s"Schema did not match\n$queryString") {
schema
}
if (needSort) {
val expectSorted = expectedOutput.split("\n").sorted.map(_.trim)
.mkString("\n").replaceAll("\\s+$", "")
val outputSorted = output.sorted.map(_.trim).mkString("\n").replaceAll("\\s+$", "")
assertResult(expectSorted, s"Result did not match\n$queryString") {
outputSorted
}
} else {
assertResult(expectedOutput, s"Result did not match\n$queryString") {
outputString
}
}
} catch {
case e: Throwable =>
val configs = conf.map {
case (k, v) => s"$k=$v"
}
throw new Exception(s"${e.getMessage}\nError using configs:\n${configs.mkString("\n")}")
}
val parent = goldenFile.getParentFile
if (!parent.exists()) {
assert(parent.mkdirs(), "Could not create directory: " + parent)
}
stringToFile(goldenFile, goldenOutput)
}
}

// Read back the golden file.
val (expectedSchema, expectedOutput) = {
val goldenOutput = fileToString(goldenFile)
val segments = goldenOutput.split("-- !query.*\n")
val sortMergeJoinConf = Map(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.PREFER_SORTMERGEJOIN.key -> "true")

// query has 3 segments, plus the header
assert(segments.size == 3,
s"Expected 3 blocks in result file but got ${segments.size}. " +
"Try regenerate the result files.")
val broadcastHashJoinConf = Map(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10485760")

(segments(1).trim, segments(2).replaceAll("\\s+$", ""))
}
val shuffledHashJoinConf = Map(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
"spark.sql.join.forceApplyShuffledHashJoin" -> "true")

assertResult(expectedSchema, s"Schema did not match\n$queryString") { schema }
assertResult(expectedOutput, s"Result did not match\n$queryString") { outputString }
}
val joinConfSet: Set[Map[String, String]] =
Copy link
Member

@HyukjinKwon HyukjinKwon Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why is this set? Then joinConfSet.head won't be deterministic below, and there would be no point of needSort.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me fix it together at #34698

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seq may be better.

Set(sortMergeJoinConf, broadcastHashJoinConf, shuffledHashJoinConf);

if (tpcdsDataPath.nonEmpty) {
tpcdsQueries.foreach { name =>
val queryString = resourceToString(s"tpcds/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
test(name) {
val goldenFile = new File(s"$baseResourcePath/v1_4", s"$name.sql.out")
runQuery(queryString, goldenFile)
runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false)
if (!regenerateGoldenFiles) {
joinConfSet.tail.foreach { conf =>
runQuery(queryString, goldenFile, conf.toSeq, true)
}
}
}
}

Expand All @@ -149,7 +194,12 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
classLoader = Thread.currentThread().getContextClassLoader)
test(s"$name-v2.7") {
val goldenFile = new File(s"$baseResourcePath/v2_7", s"$name.sql.out")
runQuery(queryString, goldenFile)
runQuery(queryString, goldenFile, joinConfSet.head.toSeq, false)
if (!regenerateGoldenFiles) {
joinConfSet.tail.foreach { conf =>
runQuery(queryString, goldenFile, conf.toSeq, true)
}
}
}
}
} else {
Expand Down