From b39f2d6acf25726d99bf2c2fa84ba6a227d0d909 Mon Sep 17 00:00:00 2001 From: Warren Zhu Date: Tue, 4 Oct 2022 13:38:17 -0700 Subject: [PATCH 1/9] [SPARK-40636][CORE] Fix wrong remained shuffles log in BlockManagerDecommissioner ### What changes were proposed in this pull request Fix wrong remained shuffles log in BlockManagerDecommissioner ### Why are the changes needed? BlockManagerDecommissioner should log correct remained shuffles. Current log used all shuffles num as remained. ``` 4 of 24 local shuffles are added. In total, 24 shuffles are remained. 2022-09-30 17:42:15.035 PDT 0 of 24 local shuffles are added. In total, 24 shuffles are remained. 2022-09-30 17:42:45.069 PDT 0 of 24 local shuffles are added. In total, 24 shuffles are remained. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested Closes #38078 from warrenzhu25/deco-log. Authored-by: Warren Zhu Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/storage/BlockManagerDecommissioner.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 6e3cf9c9b4116..3a698ce4f7079 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -280,8 +280,9 @@ private[storage] class BlockManagerDecommissioner( .sortBy(b => (b.shuffleId, b.mapId)) shufflesToMigrate.addAll(newShufflesToMigrate.map(x => (x, 0)).asJava) migratingShuffles ++= newShufflesToMigrate + val remainedShuffles = migratingShuffles.size - numMigratedShuffles.get() logInfo(s"${newShufflesToMigrate.size} of ${localShuffles.size} local shuffles " + - s"are added. In total, ${migratingShuffles.size} shuffles are remained.") + s"are added. In total, $remainedShuffles shuffles are remained.") // Update the threads doing migrations val livePeerSet = bm.getPeers(false).toSet From 633db3b8b82d8f43fddbec623800929f1e5a4139 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 4 Oct 2022 13:39:33 -0700 Subject: [PATCH 2/9] [SPARK-40648][YARN][TESTS] Add `@ExtendedLevelDBTest` to `LevelDB` relevant tests in the `yarn` module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? SPARK-40490 make the test case related to `YarnShuffleIntegrationSuite` starts to verify the `registeredExecFile` reload test scenario again,so this pr add `ExtendedLevelDBTest` to `LevelDB` relevant tests in the `yarn` module so that the `MacOs/Apple Silicon` can skip the tests through `-Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest`. ### Why are the changes needed? According to convention, Add `ExtendedLevelDBTest` to LevelDB relevant tests to make `yarn` module can skip these tests through `-Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest` on `MacOs/Apple Silicon`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test on `MacOs/Apple Silicon` ``` mvn clean install -pl resource-managers/yarn -Pyarn -am -DskipTests mvn clean install -pl resource-managers/yarn -Pyarn -Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest ``` **Before** ``` *** RUN ABORTED *** java.lang.UnsatisfiedLinkError: Could not load library. Reasons: [no leveldbjni64-1.8 in java.library.path, no leveldbjni-1.8 in java.library.path, no leveldbjni in java.library.path, /Users/yangjie01/SourceCode/git/spark-source/resource-managers/yarn/target/tmp/libleveldbjni-64-1-7057248091178764836.8: dlopen(/Users/yangjie01/SourceCode/git/spark-source/resource-managers/yarn/target/tmp/libleveldbjni-64-1-7057248091178764836.8, 1): no suitable image found. Did find: /Users/yangjie01/SourceCode/git/spark-source/resource-managers/yarn/target/tmp/libleveldbjni-64-1-7057248091178764836.8: no matching architecture in universal wrapper /Users/yangjie01/SourceCode/git/spark-source/resource-managers/yarn/target/tmp/libleveldbjni-64-1-7057248091178764836.8: no matching architecture in universal wrapper] at org.fusesource.hawtjni.runtime.Library.doLoad(Library.java:182) at org.fusesource.hawtjni.runtime.Library.load(Library.java:140) at org.fusesource.leveldbjni.JniDBFactory.(JniDBFactory.java:48) at org.apache.spark.network.util.LevelDBProvider.initLevelDB(LevelDBProvider.java:48) at org.apache.spark.network.util.DBProvider.initDB(DBProvider.java:40) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:131) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:100) at org.apache.spark.network.shuffle.ExternalBlockHandler.(ExternalBlockHandler.java:90) at org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:276) at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164) ... ``` **After** ``` Run completed in 9 minutes, 46 seconds. Total number of tests run: 164 Suites: completed 23, aborted 0 Tests: succeeded 164, failed 0, canceled 1, ignored 0, pending 0 All tests passed. ``` Closes #38095 from LuciferYang/SPARK-40648. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala | 3 ++- .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala index 692980b96f208..3857fedb7aabf 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} -import org.apache.spark.tags.ExtendedYarnTest +import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedYarnTest} /** * SPARK-34828: Integration test for the external shuffle service with an alternate name and @@ -77,6 +77,7 @@ abstract class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegratio } } } +@ExtendedLevelDBTest @ExtendedYarnTest class YarnShuffleAlternateNameConfigWithLevelDBBackendSuite extends YarnShuffleAlternateNameConfigSuite { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index deb95773676d1..923925b222e0a 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.config.Network._ import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} -import org.apache.spark.tags.ExtendedYarnTest +import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedYarnTest} /** * Integration test for the external shuffle service with a yarn mini-cluster @@ -87,6 +87,7 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { } } +@ExtendedLevelDBTest @ExtendedYarnTest class YarnShuffleIntegrationWithLevelDBBackendSuite extends YarnShuffleIntegrationSuite { @@ -118,6 +119,7 @@ abstract class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { } } +@ExtendedLevelDBTest @ExtendedYarnTest class YarnShuffleAuthWithLevelDBBackendSuite extends YarnShuffleAuthSuite { override protected def dbBackend: DBBackend = DBBackend.LEVELDB From 9bc1cb9029482914d92de2b0557ad2c336061d0f Mon Sep 17 00:00:00 2001 From: Serge Rielau Date: Tue, 4 Oct 2022 16:11:21 -0700 Subject: [PATCH 3/9] [SPARK-40585][SQL] Support double quoted identifiers ### What changes were proposed in this pull request? In this PR I propose a new session config: spark.sql.ansi.double_quoted_identifiers (true | false) When true the parser will interpret a double quoted string not as a string-literal, but - in compliance with ANSI SQL - as an identifier. We do this by splitting the double-quoted literal from the STRING token, onto its own BACKQUOTED_STRING token in the lexer. in the grammar we replace all STRING references with a rule stringLit covering STRING and BACKQUOTED_STRING with the later being conditional on the config setting being false. (Note there already is a rule stringLiteral, hence the a tad quirky name). Similarly quotedIdentifier is extended with BACKQUOTED_STRING conditional on the config being true. Note that this is NOT PERFECT. The escape logic for strings (backslash) is different from that of identifiers (double-doublequotes). Unfortunately I do not know how to change this, since introducing a NEW token DOUBLE_QUOTED_IDENTIFIER has proven to break STRING - presumably due to the overlap in the pattern in the lexer. At this point I consider this an edge-case. ### Why are the changes needed? ANSI requires quotation of identifiers to use double quotes. We have seen customer requests for support especially around column aliases. But it makes sense to have a holistic fix rather than a context specific application. ### Does this PR introduce _any_ user-facing change? Yes, this is a new config introducing a new feature. It is not a breaking change, though. ### How was this patch tested? double_quoted_identifiers.sql was added to sqltests Closes #38022 from srielau/SPARK-40585-double-quoted-identifier. Lead-authored-by: Serge Rielau Co-authored-by: Serge Rielau Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 5 +- .../sql/catalyst/parser/SqlBaseParser.g4 | 111 ++-- .../sql/catalyst/parser/AstBuilder.scala | 134 ++-- .../sql/catalyst/parser/ParseDriver.scala | 18 + .../sql/catalyst/parser/ParserUtils.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 9 + .../catalyst/parser/ParserUtilsSuite.scala | 8 +- .../spark/sql/execution/SparkSqlParser.scala | 40 +- .../inputs/double-quoted-identifiers.sql | 107 +++ .../results/double-quoted-identifiers.sql.out | 608 ++++++++++++++++++ 10 files changed, 937 insertions(+), 105 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/double-quoted-identifiers.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/double-quoted-identifiers.sql.out diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 1cbd6d24deaec..d2ccd3dfdd9bd 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -400,11 +400,14 @@ HENT_END: '*/'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' - | '"' ( ~('"'|'\\') | ('\\' .) )* '"' | 'R\'' (~'\'')* '\'' | 'R"'(~'"')* '"' ; +DOUBLEQUOTED_STRING + :'"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + BIGINT_LITERAL : DIGIT+ 'L' ; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index f398ddd76f712..5b61c767fbe3b 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -35,6 +35,11 @@ options { tokenVocab = SqlBaseLexer; } * When true, the behavior of keywords follows ANSI SQL standard. */ public boolean SQL_standard_keyword_behavior = false; + + /** + * When true, double quoted literals are identifiers rather than STRINGs. + */ + public boolean double_quoted_identifiers = false; } singleStatement @@ -70,7 +75,7 @@ statement | ctes? dmlStatementNoWith #dmlStatement | USE multipartIdentifier #use | USE namespace multipartIdentifier #useNamespace - | SET CATALOG (identifier | STRING) #setCatalog + | SET CATALOG (identifier | stringLit) #setCatalog | CREATE namespace (IF NOT EXISTS)? multipartIdentifier (commentSpec | locationSpec | @@ -82,7 +87,7 @@ statement | DROP namespace (IF EXISTS)? multipartIdentifier (RESTRICT | CASCADE)? #dropNamespace | SHOW namespaces ((FROM | IN) multipartIdentifier)? - (LIKE? pattern=STRING)? #showNamespaces + (LIKE? pattern=stringLit)? #showNamespaces | createTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider? createTableClauses (AS? query)? #createTable @@ -132,7 +137,7 @@ statement LEFT_PAREN columns=qualifiedColTypeWithPositionList RIGHT_PAREN #hiveReplaceColumns | ALTER TABLE multipartIdentifier (partitionSpec)? - SET SERDE STRING (WITH SERDEPROPERTIES propertyList)? #setTableSerDe + SET SERDE stringLit (WITH SERDEPROPERTIES propertyList)? #setTableSerDe | ALTER TABLE multipartIdentifier (partitionSpec)? SET SERDEPROPERTIES propertyList #setTableSerDe | ALTER (TABLE | VIEW) multipartIdentifier ADD (IF NOT EXISTS)? @@ -158,27 +163,27 @@ statement (OPTIONS propertyList)? #createTempViewUsing | ALTER VIEW multipartIdentifier AS? query #alterViewQuery | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)? - multipartIdentifier AS className=STRING + multipartIdentifier AS className=stringLit (USING resource (COMMA resource)*)? #createFunction | DROP TEMPORARY? FUNCTION (IF EXISTS)? multipartIdentifier #dropFunction | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)? statement #explain | SHOW TABLES ((FROM | IN) multipartIdentifier)? - (LIKE? pattern=STRING)? #showTables + (LIKE? pattern=stringLit)? #showTables | SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)? - LIKE pattern=STRING partitionSpec? #showTableExtended + LIKE pattern=stringLit partitionSpec? #showTableExtended | SHOW TBLPROPERTIES table=multipartIdentifier (LEFT_PAREN key=propertyKey RIGHT_PAREN)? #showTblProperties | SHOW COLUMNS (FROM | IN) table=multipartIdentifier ((FROM | IN) ns=multipartIdentifier)? #showColumns | SHOW VIEWS ((FROM | IN) multipartIdentifier)? - (LIKE? pattern=STRING)? #showViews + (LIKE? pattern=stringLit)? #showViews | SHOW PARTITIONS multipartIdentifier partitionSpec? #showPartitions | SHOW identifier? FUNCTIONS ((FROM | IN) ns=multipartIdentifier)? - (LIKE? (legacy=multipartIdentifier | pattern=STRING))? #showFunctions + (LIKE? (legacy=multipartIdentifier | pattern=stringLit))? #showFunctions | SHOW CREATE TABLE multipartIdentifier (AS SERDE)? #showCreateTable | SHOW CURRENT namespace #showCurrentNamespace - | SHOW CATALOGS (LIKE? pattern=STRING)? #showCatalogs + | SHOW CATALOGS (LIKE? pattern=stringLit)? #showCatalogs | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) namespace EXTENDED? multipartIdentifier #describeNamespace @@ -186,16 +191,16 @@ statement multipartIdentifier partitionSpec? describeColName? #describeRelation | (DESC | DESCRIBE) QUERY? query #describeQuery | COMMENT ON namespace multipartIdentifier IS - comment=(STRING | NULL) #commentNamespace - | COMMENT ON TABLE multipartIdentifier IS comment=(STRING | NULL) #commentTable + comment #commentNamespace + | COMMENT ON TABLE multipartIdentifier IS comment #commentTable | REFRESH TABLE multipartIdentifier #refreshTable | REFRESH FUNCTION multipartIdentifier #refreshFunction - | REFRESH (STRING | .*?) #refreshResource + | REFRESH (stringLit | .*?) #refreshResource | CACHE LAZY? TABLE multipartIdentifier (OPTIONS options=propertyList)? (AS? query)? #cacheTable | UNCACHE TABLE (IF EXISTS)? multipartIdentifier #uncacheTable | CLEAR CACHE #clearCache - | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE + | LOAD DATA LOCAL? INPATH path=stringLit OVERWRITE? INTO TABLE multipartIdentifier partitionSpec? #loadData | TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable | MSCK REPAIR TABLE multipartIdentifier @@ -203,7 +208,7 @@ statement | op=(ADD | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET TIME ZONE interval #setTimeZone - | SET TIME ZONE timezone=(STRING | LOCAL) #setTimeZone + | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone | SET configKey EQ configValue #setQuotedConfiguration | SET configKey (EQ .*?)? #setConfiguration @@ -219,12 +224,18 @@ statement | unsupportedHiveNativeCommands .*? #failNativeCommand ; +timezone + : STRING + | {!double_quoted_identifiers}? DOUBLEQUOTED_STRING + | LOCAL + ; + configKey : quotedIdentifier ; configValue - : quotedIdentifier + : backQuotedIdentifier ; unsupportedHiveNativeCommands @@ -295,11 +306,11 @@ skewSpec ; locationSpec - : LOCATION STRING + : LOCATION stringLit ; commentSpec - : COMMENT STRING + : COMMENT stringLit ; query @@ -309,8 +320,8 @@ query insertInto : INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? identifierList? #insertOverwriteTable | INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? identifierList? #insertIntoTable - | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir - | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir + | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir + | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir ; partitionSpecLocation @@ -340,7 +351,7 @@ namespaces describeFuncName : qualifiedName - | STRING + | stringLit | comparisonOperator | arithmeticOperator | predicateOperator @@ -384,14 +395,14 @@ property propertyKey : identifier (DOT identifier)* - | STRING + | stringLit ; propertyValue : INTEGER_VALUE | DECIMAL_VALUE | booleanValue - | STRING + | stringLit ; constantList @@ -408,16 +419,16 @@ createFileFormat ; fileFormat - : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING #tableFileFormat + : INPUTFORMAT inFmt=stringLit OUTPUTFORMAT outFmt=stringLit #tableFileFormat | identifier #genericFileFormat ; storageHandler - : STRING (WITH SERDEPROPERTIES propertyList)? + : stringLit (WITH SERDEPROPERTIES propertyList)? ; resource - : identifier STRING + : identifier stringLit ; dmlStatementNoWith @@ -508,11 +519,11 @@ transformClause | kind=MAP setQuantifier? expressionSeq | kind=REDUCE setQuantifier? expressionSeq) inRowFormat=rowFormat? - (RECORDWRITER recordWriter=STRING)? - USING script=STRING + (RECORDWRITER recordWriter=stringLit)? + USING script=stringLit (AS (identifierSeq | colTypeList | (LEFT_PAREN (identifierSeq | colTypeList) RIGHT_PAREN)))? outRowFormat=rowFormat? - (RECORDREADER recordReader=STRING)? + (RECORDREADER recordReader=stringLit)? ; selectClause @@ -572,7 +583,7 @@ fromClause ; temporalClause - : FOR? (SYSTEM_VERSION | VERSION) AS OF version=(INTEGER_VALUE | STRING) + : FOR? (SYSTEM_VERSION | VERSION) AS OF version | FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=valueExpression ; @@ -709,13 +720,13 @@ tableAlias ; rowFormat - : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde + : ROW FORMAT SERDE name=stringLit (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde | ROW FORMAT DELIMITED - (FIELDS TERMINATED BY fieldsTerminatedBy=STRING (ESCAPED BY escapedBy=STRING)?)? - (COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=STRING)? - (MAP KEYS TERMINATED BY keysTerminatedBy=STRING)? - (LINES TERMINATED BY linesSeparatedBy=STRING)? - (NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited + (FIELDS TERMINATED BY fieldsTerminatedBy=stringLit (ESCAPED BY escapedBy=stringLit)?)? + (COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=stringLit)? + (MAP KEYS TERMINATED BY keysTerminatedBy=stringLit)? + (LINES TERMINATED BY linesSeparatedBy=stringLit)? + (NULL DEFINED AS nullDefinedAs=stringLit)? #rowFormatDelimited ; multipartIdentifierList @@ -792,7 +803,7 @@ predicate | NOT? kind=IN LEFT_PAREN query RIGHT_PAREN | NOT? kind=RLIKE pattern=valueExpression | NOT? kind=(LIKE | ILIKE) quantifier=(ANY | SOME | ALL) (LEFT_PAREN RIGHT_PAREN | LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN) - | NOT? kind=(LIKE | ILIKE) pattern=valueExpression (ESCAPE escapeChar=STRING)? + | NOT? kind=(LIKE | ILIKE) pattern=valueExpression (ESCAPE escapeChar=stringLit)? | IS NOT? kind=NULL | IS NOT? kind=(TRUE | FALSE | UNKNOWN) | IS NOT? kind=DISTINCT FROM right=valueExpression @@ -856,10 +867,10 @@ primaryExpression constant : NULL #nullLiteral | interval #intervalLiteral - | identifier STRING #typeConstructor + | identifier stringLit #typeConstructor | number #numericLiteral | booleanValue #booleanLiteral - | STRING+ #stringLiteral + | stringLit+ #stringLiteral ; comparisonOperator @@ -899,7 +910,9 @@ unitToUnitInterval ; intervalValue - : (PLUS | MINUS)? (INTEGER_VALUE | DECIMAL_VALUE | STRING) + : (PLUS | MINUS)? + (INTEGER_VALUE | DECIMAL_VALUE | STRING + | {!double_quoted_identifiers}? DOUBLEQUOTED_STRING) ; colPosition @@ -1030,6 +1043,11 @@ strictIdentifier ; quotedIdentifier + : BACKQUOTED_IDENTIFIER + | {double_quoted_identifiers}? DOUBLEQUOTED_STRING + ; + +backQuotedIdentifier : BACKQUOTED_IDENTIFIER ; @@ -1055,7 +1073,22 @@ alterColumnAction | dropDefault=DROP DEFAULT ; +stringLit + : STRING + | {!double_quoted_identifiers}? DOUBLEQUOTED_STRING + ; + +comment + : STRING + | {!double_quoted_identifiers}? DOUBLEQUOTED_STRING + | NULL + ; +version + : INTEGER_VALUE + | STRING + | {!double_quoted_identifiers}? DOUBLEQUOTED_STRING + ; // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL. // - Reserved keywords: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 248837cc3ae14..a89f5a0f3ae7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -707,15 +707,15 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit isDistinct = false) ScriptTransformation( - string(transformClause.script), + string(visitStringLit(transformClause.script)), attributes, plan, withScriptIOSchema( ctx, transformClause.inRowFormat, - transformClause.recordWriter, + visitStringLit(transformClause.recordWriter), transformClause.outRowFormat, - transformClause.recordReader, + visitStringLit(transformClause.recordReader), schemaLess ) ) @@ -819,6 +819,11 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = { + + def entry(key: String, value: StringLitContext): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(visitStringLit(x))) + } + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema // expects a seq of pairs in which the old parsers' token names are used as keys. // Transforming the result of visitRowFormatDelimited would be quite a bit messier than @@ -827,8 +832,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++ entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++ entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) ++ - Option(ctx.linesSeparatedBy).toSeq.map { token => - val value = string(token) + Option(ctx.linesSeparatedBy).toSeq.map { stringLitCtx => + val value = string(visitStringLit(stringLitCtx)) validate( value == "\n", s"LINES TERMINATED BY only supports newline '\\n' right now: $value", @@ -1276,14 +1281,24 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit table.optionalMap(ctx.sample)(withSample) } + override def visitVersion(ctx: VersionContext): Option[String] = { + if (ctx != null) { + if (ctx.INTEGER_VALUE != null) { + Some(ctx.INTEGER_VALUE().getText) + } else if (ctx.DOUBLEQUOTED_STRING() != null) { + Option(ctx.DOUBLEQUOTED_STRING()).map(string) + } else { + Option(ctx.STRING()).map(string) + } + } else { + None + } + } + private def withTimeTravel( ctx: TemporalClauseContext, plan: LogicalPlan): LogicalPlan = withOrigin(ctx) { val v = ctx.version - val version = if (ctx.INTEGER_VALUE != null) { - Some(v.getText) - } else { - Option(v).map(string) - } + val version = visitVersion(ctx.version) val timestamp = Option(ctx.timestamp).map(expression) if (timestamp.exists(_.references.nonEmpty)) { throw QueryParsingErrors.invalidTimeTravelSpec( @@ -1671,7 +1686,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit .map(p => invertIfNotDefined(getLike(e, p))).toSeq.reduceLeft(And) } case _ => - val escapeChar = Option(ctx.escapeChar).map(string).map { str => + val escapeChar = Option(ctx.escapeChar) + .map(stringLitCtx => string(visitStringLit(stringLitCtx))).map { str => if (str.length != 1) { throw QueryParsingErrors.invalidEscapeStringError(ctx) } @@ -2205,7 +2221,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit * Currently Date, Timestamp, Interval and Binary typed literals are supported. */ override def visitTypeConstructor(ctx: TypeConstructorContext): Literal = withOrigin(ctx) { - val value = string(ctx.STRING) + val value = string(visitStringLit(ctx.stringLit)) val valueType = ctx.identifier.getText.toUpperCase(Locale.ROOT) def toLiteral[T](f: UTF8String => Option[T], t: DataType): Literal = { @@ -2444,9 +2460,9 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit */ private def createString(ctx: StringLiteralContext): String = { if (conf.escapedStringLiterals) { - ctx.STRING().asScala.map(stringWithoutUnescape).mkString + ctx.stringLit.asScala.map(x => stringWithoutUnescape(visitStringLit(x))).mkString } else { - ctx.STRING().asScala.map(string).mkString + ctx.stringLit.asScala.map(x => string(visitStringLit(x))).mkString } } @@ -2598,8 +2614,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit assert(units.length == values.length) val kvs = units.indices.map { i => val u = units(i).getText - val v = if (values(i).STRING() != null) { - val value = string(values(i).STRING()) + val v = if (values(i).STRING() != null || values(i).DOUBLEQUOTED_STRING() != null) { + val value = string(if (values(i).STRING() != null) { + values(i).STRING() + } + else { + values(i).DOUBLEQUOTED_STRING() + } + ) // SPARK-32840: For invalid cases, e.g. INTERVAL '1 day 2' hour, // INTERVAL 'interval 1' day, we need to check ahead before they are concatenated with // units and become valid ones, e.g. '1 day 2 hour'. @@ -2637,7 +2659,12 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit */ override def visitUnitToUnitInterval(ctx: UnitToUnitIntervalContext): CalendarInterval = { withOrigin(ctx) { - val value = Option(ctx.intervalValue.STRING).map(string).map { interval => + val value = Option(if (ctx.intervalValue.STRING != null) { + ctx.intervalValue.STRING + } else { + ctx.intervalValue.DOUBLEQUOTED_STRING + } + ).map(string).map { interval => if (ctx.intervalValue().MINUS() == null) { interval } else if (interval.startsWith("-")) { @@ -2869,7 +2896,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit * Create a location string. */ override def visitLocationSpec(ctx: LocationSpecContext): String = withOrigin(ctx) { - string(ctx.STRING) + string(visitStringLit(ctx.stringLit)) } /** @@ -2883,7 +2910,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit * Create a comment string. */ override def visitCommentSpec(ctx: CommentSpecContext): String = withOrigin(ctx) { - string(ctx.STRING) + string(visitStringLit(ctx.stringLit)) } /** @@ -2978,8 +3005,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit * identifier. */ override def visitPropertyKey(key: PropertyKeyContext): String = { - if (key.STRING != null) { - string(key.STRING) + if (key.stringLit() != null) { + string(visitStringLit(key.stringLit())) } else { key.getText } @@ -2992,8 +3019,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitPropertyValue(value: PropertyValueContext): String = { if (value == null) { null - } else if (value.STRING != null) { - string(value.STRING) + } else if (value.stringLit() != null) { + string(visitStringLit(value.stringLit())) } else if (value.booleanValue != null) { value.getText.toLowerCase(Locale.ROOT) } else { @@ -3001,6 +3028,18 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } } + override def visitStringLit(ctx: StringLitContext): Token = { + if (ctx != null) { + if (ctx.STRING != null) { + ctx.STRING.getSymbol + } else { + ctx.DOUBLEQUOTED_STRING.getSymbol + } + } else { + null + } + } + /** * Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal). */ @@ -3266,7 +3305,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier) ShowNamespaces( UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])), - Option(ctx.pattern).map(string)) + Option(ctx.pattern).map(x => string(visitStringLit(x)))) } /** @@ -3342,7 +3381,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit (ctx.fileFormat, ctx.storageHandler) match { // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format case (c: TableFileFormatContext, null) => - SerdeInfo(formatClasses = Some(FormatClasses(string(c.inFmt), string(c.outFmt)))) + SerdeInfo(formatClasses = Some(FormatClasses(string(visitStringLit(c.inFmt)), + string(visitStringLit(c.outFmt))))) // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO case (c: GenericFileFormatContext, null) => SerdeInfo(storedAs = Some(c.identifier.getText)) @@ -3384,7 +3424,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitRowFormatSerde(ctx: RowFormatSerdeContext): SerdeInfo = withOrigin(ctx) { import ctx._ SerdeInfo( - serde = Some(string(name)), + serde = Some(string(visitStringLit(name))), serdeProperties = Option(propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) } @@ -3394,8 +3434,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitRowFormatDelimited( ctx: RowFormatDelimitedContext): SerdeInfo = withOrigin(ctx) { // Collect the entries if any. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).toSeq.map(x => key -> string(x)) + def entry(key: String, value: StringLitContext): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(visitStringLit(x))) } // TODO we need proper support for the NULL format. val entries = @@ -3406,7 +3446,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ entry("mapkey.delim", ctx.keysTerminatedBy) ++ Option(ctx.linesSeparatedBy).toSeq.map { token => - val value = string(token) + val value = string(visitStringLit(token)) validate( value == "\n", s"LINES TERMINATED BY only supports newline '\\n' right now: $value", @@ -3714,7 +3754,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier) ShowTables( UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])), - Option(ctx.pattern).map(string)) + Option(ctx.pattern).map(x => string(visitStringLit(x)))) } /** @@ -3728,7 +3768,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } ShowTableExtended( UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])), - string(ctx.pattern), + string(visitStringLit(ctx.pattern)), partitionKeys) } @@ -3739,7 +3779,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val multiPart = Option(ctx.multipartIdentifier).map(visitMultipartIdentifier) ShowViews( UnresolvedNamespace(multiPart.getOrElse(Seq.empty[String])), - Option(ctx.pattern).map(string)) + Option(ctx.pattern).map(x => string(visitStringLit(x)))) } override def visitColPosition(ctx: ColPositionContext): ColumnPosition = { @@ -4193,7 +4233,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) { LoadData( child = createUnresolvedTable(ctx.multipartIdentifier, "LOAD DATA"), - path = string(ctx.path), + path = string(visitStringLit(ctx.path)), isLocal = ctx.LOCAL != null, isOverwrite = ctx.OVERWRITE != null, partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) @@ -4439,7 +4479,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit ctx.multipartIdentifier, "ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]", alterTableTypeMismatchHint), - Option(ctx.STRING).map(string), + Option(ctx.stringLit).map(x => string(visitStringLit(x))), Option(ctx.propertyList).map(visitPropertyKeyValues), // TODO a partition spec is allowed to have optional values. This is currently violated. Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) @@ -4500,8 +4540,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitDescribeFunction(ctx: DescribeFunctionContext): LogicalPlan = withOrigin(ctx) { import ctx._ val functionName = - if (describeFuncName.STRING() != null) { - Seq(string(describeFuncName.STRING())) + if (describeFuncName.stringLit() != null) { + Seq(string(visitStringLit(describeFuncName.stringLit()))) } else if (describeFuncName.qualifiedName() != null) { visitQualifiedName(describeFuncName.qualifiedName) } else { @@ -4540,7 +4580,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } else { UnresolvedNamespace(Nil) } - val pattern = Option(ctx.pattern).map(string).orElse(legacy.map(_.last)) + val pattern = Option(ctx.pattern).map(x => string(visitStringLit(x))).orElse(legacy.map(_.last)) ShowFunctions(nsPlan, userScope, systemScope, pattern) } @@ -4554,22 +4594,26 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { - val comment = ctx.comment.getType match { - case SqlBaseParser.NULL => "" - case _ => string(ctx.STRING) - } + val comment = visitComment(ctx.comment) val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) CommentOnNamespace(UnresolvedNamespace(nameParts), comment) } override def visitCommentTable(ctx: CommentTableContext): LogicalPlan = withOrigin(ctx) { - val comment = ctx.comment.getType match { - case SqlBaseParser.NULL => "" - case _ => string(ctx.STRING) - } + val comment = visitComment(ctx.comment) CommentOnTable(createUnresolvedTable(ctx.multipartIdentifier, "COMMENT ON TABLE"), comment) } + override def visitComment (ctx: CommentContext): String = { + if (ctx.STRING() != null) { + string(ctx.STRING) + } else if (ctx.DOUBLEQUOTED_STRING() != null) { + string(ctx.DOUBLEQUOTED_STRING()) + } else { + "" + } + } + /** * Create an index, returning a [[CreateIndex]] logical plan. * For example: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index c85a0c2cd45fc..d22514ade78d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -118,6 +118,7 @@ abstract class AbstractSqlParser extends ParserInterface with SQLConfHelper with parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled parser.SQL_standard_keyword_behavior = conf.enforceReservedKeywords + parser.double_quoted_identifiers = conf.double_quoted_identifiers try { try { @@ -347,6 +348,23 @@ case object PostProcessor extends SqlBaseParserBaseListener { /** Remove the back ticks from an Identifier. */ override def exitQuotedIdentifier(ctx: SqlBaseParser.QuotedIdentifierContext): Unit = { + if (ctx.BACKQUOTED_IDENTIFIER() != null) { + replaceTokenByIdentifier(ctx, 1) { token => + // Remove the double back ticks in the string. + token.setText(token.getText.replace("``", "`")) + token + } + } else if (ctx.DOUBLEQUOTED_STRING() != null) { + replaceTokenByIdentifier(ctx, 1) { token => + // Remove the double quotes in the string. + token.setText(token.getText.replace("\"\"", "\"")) + token + } + } + } + + /** Remove the back ticks from an Identifier. */ + override def exitBackQuotedIdentifier(ctx: SqlBaseParser.BackQuotedIdentifierContext): Unit = { replaceTokenByIdentifier(ctx, 1) { token => // Remove the double back ticks in the string. token.setText(token.getText.replace("``", "`")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 6be8d922bb869..acd0ecfd10921 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -98,7 +98,7 @@ object ParserUtils { def string(node: TerminalNode): String = unescapeSQLString(node.getText) /** Convert a string node into a string without unescaping. */ - def stringWithoutUnescape(node: TerminalNode): String = { + def stringWithoutUnescape(node: Token): String = { // STRING parser rule forces that the input always has quotes at the starting and ending. node.getText.slice(1, node.getText.size - 1) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 44d34af1e47e5..5a5f3a83a2a16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2909,6 +2909,13 @@ object SQLConf { .booleanConf .createWithDefault(sys.env.get("SPARK_ANSI_SQL_MODE").contains("true")) + val DOUBLE_QUOTED_IDENTIFIERS = buildConf("spark.sql.ansi.double_quoted_identifiers") + .doc("When true, Spark SQL reads literals enclosed in double quoted (\") as identifiers. " + + "When false they are read as string literals.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val ENABLE_DEFAULT_COLUMNS = buildConf("spark.sql.defaultColumn.enabled") .internal() @@ -4585,6 +4592,8 @@ class SQLConf extends Serializable with Logging { def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS) + def double_quoted_identifiers: Boolean = getConf(DOUBLE_QUOTED_IDENTIFIERS) + def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match { case "TIMESTAMP_LTZ" => // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala index 818ddb63104a5..1d9965548a205 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala @@ -180,10 +180,10 @@ class ParserUtilsSuite extends SparkFunSuite { } test("string") { - assert(string(showDbsContext.pattern) == "identifier_with_wildcards") - assert(string(createDbContext.commentSpec().get(0).STRING()) == "database_comment") + assert(string(showDbsContext.pattern.STRING()) == "identifier_with_wildcards") + assert(string(createDbContext.commentSpec().get(0).stringLit().STRING()) == "database_comment") - assert(string(createDbContext.locationSpec.asScala.head.STRING) == "/home/user/db") + assert(string(createDbContext.locationSpec.asScala.head.stringLit().STRING) == "/home/user/db") } test("position") { @@ -211,7 +211,7 @@ class ParserUtilsSuite extends SparkFunSuite { val ctx = createDbContext.locationSpec.asScala.head val current = CurrentOrigin.get val (location, origin) = withOrigin(ctx) { - (string(ctx.STRING), CurrentOrigin.get) + (string(ctx.stringLit().STRING), CurrentOrigin.get) } assert(location == "/home/user/db") assert(origin == Origin(Some(3), Some(27))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a82fc47f427ae..5719b0566df99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -161,22 +161,31 @@ class SparkSqlAstBuilder extends AstBuilder { SetCommand(Some(key -> Some(ZoneOffset.ofTotalSeconds(seconds).toString))) } } else if (ctx.timezone != null) { - ctx.timezone.getType match { - case SqlBaseParser.LOCAL => - SetCommand(Some(key -> Some(TimeZone.getDefault.getID))) - case _ => - SetCommand(Some(key -> Some(string(ctx.STRING)))) - } + SetCommand(Some(key -> Some(visitTimezone(ctx.timezone())))) } else { throw QueryParsingErrors.invalidTimeZoneDisplacementValueError(ctx) } } + override def visitTimezone (ctx: TimezoneContext): String = { + if (ctx.STRING() != null) { + string(ctx.STRING) + } else if (ctx.DOUBLEQUOTED_STRING() != null) { + string(ctx.DOUBLEQUOTED_STRING()) + } else { + TimeZone.getDefault.getID + } + } + /** * Create a [[RefreshResource]] logical plan. */ override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan = withOrigin(ctx) { - val path = if (ctx.STRING != null) string(ctx.STRING) else extractUnquotedResourcePath(ctx) + val path = if (ctx.stringLit != null) { + string(visitStringLit(ctx.stringLit)) + } else { + extractUnquotedResourcePath(ctx) + } RefreshResource(path) } @@ -258,8 +267,8 @@ class SparkSqlAstBuilder extends AstBuilder { override def visitSetCatalog(ctx: SetCatalogContext): LogicalPlan = withOrigin(ctx) { if (ctx.identifier() != null) { SetCatalogCommand(ctx.identifier().getText) - } else if (ctx.STRING() != null) { - SetCatalogCommand(string(ctx.STRING())) + } else if (ctx.stringLit() != null) { + SetCatalogCommand(string(visitStringLit(ctx.stringLit()))) } else { throw new IllegalStateException("Invalid catalog name") } @@ -269,7 +278,7 @@ class SparkSqlAstBuilder extends AstBuilder { * Create a [[ShowCatalogsCommand]] logical command. */ override def visitShowCatalogs(ctx: ShowCatalogsContext) : LogicalPlan = withOrigin(ctx) { - ShowCatalogsCommand(Option(ctx.pattern).map(string)) + ShowCatalogsCommand(Option(ctx.pattern).map(x => string(visitStringLit(x)))) } /** @@ -534,7 +543,8 @@ class SparkSqlAstBuilder extends AstBuilder { val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT) resourceType match { case "jar" | "file" | "archive" => - FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING)) + FunctionResource(FunctionResourceType.fromString(resourceType), + string(visitStringLit(resource.stringLit()))) case other => operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx) } @@ -548,7 +558,7 @@ class SparkSqlAstBuilder extends AstBuilder { if (ctx.TEMPORARY == null) { CreateFunction( UnresolvedIdentifier(functionIdentifier), - string(ctx.className), + string(visitStringLit(ctx.className)), resources.toSeq, ctx.EXISTS != null, ctx.REPLACE != null) @@ -566,7 +576,7 @@ class SparkSqlAstBuilder extends AstBuilder { } CreateFunctionCommand( FunctionIdentifier(functionIdentifier.last), - string(ctx.className), + string(visitStringLit(ctx.className)), resources.toSeq, true, ctx.EXISTS != null, @@ -788,7 +798,7 @@ class SparkSqlAstBuilder extends AstBuilder { val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) var storage = DataSource.buildStorageFormatFromOptions(options) - val path = Option(ctx.path).map(string).getOrElse("") + val path = Option(ctx.path).map(x => string(visitStringLit(x))).getOrElse("") if (!(path.isEmpty ^ storage.locationUri.isEmpty)) { throw QueryParsingErrors.directoryPathAndOptionsPathBothSpecifiedError(ctx) @@ -833,7 +843,7 @@ class SparkSqlAstBuilder extends AstBuilder { ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { val serdeInfo = getSerdeInfo( Option(ctx.rowFormat).toSeq, Option(ctx.createFileFormat).toSeq, ctx) - val path = string(ctx.path) + val path = string(visitStringLit(ctx.path)) // The path field is required if (path.isEmpty) { operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx) diff --git a/sql/core/src/test/resources/sql-tests/inputs/double-quoted-identifiers.sql b/sql/core/src/test/resources/sql-tests/inputs/double-quoted-identifiers.sql new file mode 100644 index 0000000000000..7fe35e5a410ba --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/double-quoted-identifiers.sql @@ -0,0 +1,107 @@ +-- test cases for spark.sql.ansi.double_quoted_identifiers + +-- Base line +SET spark.sql.ansi.double_quoted_identifiers = false; + +-- All these should error out in the parser +SELECT 1 FROM "not_exist"; + +USE SCHEMA "not_exist"; + +ALTER TABLE "not_exist" ADD COLUMN not_exist int; + +ALTER TABLE not_exist ADD COLUMN "not_exist" int; + +SELECT 1 AS "not_exist" FROM not_exist; + +SELECT 1 FROM not_exist AS X("hello"); + +SELECT "not_exist"(); + +SELECT "not_exist".not_exist(); + +-- All these should error out in analysis +SELECT 1 FROM `hello`; + +USE SCHEMA `not_exist`; + +ALTER TABLE `not_exist` ADD COLUMN not_exist int; + +ALTER TABLE not_exist ADD COLUMN `not_exist` int; + +SELECT 1 AS `not_exist` FROM `not_exist`; + +SELECT 1 FROM not_exist AS X(`hello`); + +SELECT `not_exist`(); + +SELECT `not_exist`.not_exist(); + +-- Strings in various situations all work +SELECT "hello"; + +CREATE TEMPORARY VIEW v(c1 COMMENT "hello") AS SELECT 1; +DROP VIEW v; + +SELECT INTERVAL "1" YEAR; + +-- Now turn on the config. +SET spark.sql.ansi.double_quoted_identifiers = true; + +-- All these should error out in analysis now +SELECT 1 FROM "not_exist"; + +USE SCHEMA "not_exist"; + +ALTER TABLE "not_exist" ADD COLUMN not_exist int; + +ALTER TABLE not_exist ADD COLUMN "not_exist" int; + +SELECT 1 AS "not_exist" FROM not_exist; + +SELECT 1 FROM not_exist AS X("hello"); + +SELECT "not_exist"(); + +SELECT "not_exist".not_exist(); + +SELECT "hello"; + +-- Back ticks still work +SELECT 1 FROM `hello`; + +USE SCHEMA `not_exist`; + +ALTER TABLE `not_exist` ADD COLUMN not_exist int; + +ALTER TABLE not_exist ADD COLUMN `not_exist` int; + +SELECT 1 AS `not_exist` FROM `not_exist`; + +SELECT 1 FROM not_exist AS X(`hello`); + +SELECT `not_exist`(); + +SELECT `not_exist`.not_exist(); + +-- These fail in the parser now +CREATE TEMPORARY VIEW v(c1 COMMENT "hello") AS SELECT 1; +DROP VIEW v; + +SELECT INTERVAL "1" YEAR; + +-- Single ticks still work +SELECT 'hello'; + +CREATE TEMPORARY VIEW v(c1 COMMENT 'hello') AS SELECT 1; +DROP VIEW v; + +SELECT INTERVAL '1' YEAR; + +-- A whole scenario +CREATE SCHEMA "myschema"; +CREATE TEMPORARY VIEW "myview"("c1") AS + WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v"; +SELECT "a1" AS "a2" FROM "myview" AS "atab"("a1"); +DROP TABLE "myview"; +DROP SCHEMA "myschema"; diff --git a/sql/core/src/test/resources/sql-tests/results/double-quoted-identifiers.sql.out b/sql/core/src/test/resources/sql-tests/results/double-quoted-identifiers.sql.out new file mode 100644 index 0000000000000..9207c76e9a0f9 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/double-quoted-identifiers.sql.out @@ -0,0 +1,608 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +SET spark.sql.ansi.double_quoted_identifiers = false +-- !query schema +struct +-- !query output +spark.sql.ansi.double_quoted_identifiers false + + +-- !query +SELECT 1 FROM "not_exist" +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +USE SCHEMA "not_exist" +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +ALTER TABLE "not_exist" ADD COLUMN not_exist int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +ALTER TABLE not_exist ADD COLUMN "not_exist" int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +SELECT 1 AS "not_exist" FROM not_exist +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +SELECT 1 FROM not_exist AS X("hello") +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"hello\"'", + "hint" : "" + } +} + + +-- !query +SELECT "not_exist"() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +SELECT "not_exist".not_exist() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"not_exist\"'", + "hint" : "" + } +} + + +-- !query +SELECT 1 FROM `hello` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: hello; line 1 pos 14 + + +-- !query +USE SCHEMA `not_exist` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +Database 'not_exist' not found + + +-- !query +ALTER TABLE `not_exist` ADD COLUMN not_exist int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table not found: not_exist; line 1 pos 12 + + +-- !query +ALTER TABLE not_exist ADD COLUMN `not_exist` int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table not found: not_exist; line 1 pos 12 + + +-- !query +SELECT 1 AS `not_exist` FROM `not_exist` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 29 + + +-- !query +SELECT 1 FROM not_exist AS X(`hello`) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 14 + + +-- !query +SELECT `not_exist`() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1242", + "messageParameters" : { + "fullName" : "spark_catalog.default.not_exist", + "rawName" : "not_exist" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 20, + "fragment" : "`not_exist`()" + } ] +} + + +-- !query +SELECT `not_exist`.not_exist() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1243", + "messageParameters" : { + "rawName" : "not_exist.not_exist" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "`not_exist`.not_exist()" + } ] +} + + +-- !query +SELECT "hello" +-- !query schema +struct +-- !query output +hello + + +-- !query +CREATE TEMPORARY VIEW v(c1 COMMENT "hello") AS SELECT 1 +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP VIEW v +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT INTERVAL "1" YEAR +-- !query schema +struct +-- !query output +1-0 + + +-- !query +SET spark.sql.ansi.double_quoted_identifiers = true +-- !query schema +struct +-- !query output +spark.sql.ansi.double_quoted_identifiers true + + +-- !query +SELECT 1 FROM "not_exist" +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 14 + + +-- !query +USE SCHEMA "not_exist" +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +Database 'not_exist' not found + + +-- !query +ALTER TABLE "not_exist" ADD COLUMN not_exist int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table not found: not_exist; line 1 pos 12 + + +-- !query +ALTER TABLE not_exist ADD COLUMN "not_exist" int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table not found: not_exist; line 1 pos 12 + + +-- !query +SELECT 1 AS "not_exist" FROM not_exist +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 29 + + +-- !query +SELECT 1 FROM not_exist AS X("hello") +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 14 + + +-- !query +SELECT "not_exist"() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1242", + "messageParameters" : { + "fullName" : "spark_catalog.default.not_exist", + "rawName" : "not_exist" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 20, + "fragment" : "\"not_exist\"()" + } ] +} + + +-- !query +SELECT "not_exist".not_exist() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1243", + "messageParameters" : { + "rawName" : "not_exist.not_exist" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "\"not_exist\".not_exist()" + } ] +} + + +-- !query +SELECT "hello" +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN", + "errorSubClass" : "WITHOUT_SUGGESTION", + "sqlState" : "42000", + "messageParameters" : { + "objectName" : "`hello`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 14, + "fragment" : "\"hello\"" + } ] +} + + +-- !query +SELECT 1 FROM `hello` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: hello; line 1 pos 14 + + +-- !query +USE SCHEMA `not_exist` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +Database 'not_exist' not found + + +-- !query +ALTER TABLE `not_exist` ADD COLUMN not_exist int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table not found: not_exist; line 1 pos 12 + + +-- !query +ALTER TABLE not_exist ADD COLUMN `not_exist` int +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table not found: not_exist; line 1 pos 12 + + +-- !query +SELECT 1 AS `not_exist` FROM `not_exist` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 29 + + +-- !query +SELECT 1 FROM not_exist AS X(`hello`) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: not_exist; line 1 pos 14 + + +-- !query +SELECT `not_exist`() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1242", + "messageParameters" : { + "fullName" : "spark_catalog.default.not_exist", + "rawName" : "not_exist" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 20, + "fragment" : "`not_exist`()" + } ] +} + + +-- !query +SELECT `not_exist`.not_exist() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1243", + "messageParameters" : { + "rawName" : "not_exist.not_exist" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "`not_exist`.not_exist()" + } ] +} + + +-- !query +CREATE TEMPORARY VIEW v(c1 COMMENT "hello") AS SELECT 1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"hello\"'", + "hint" : "" + } +} + + +-- !query +DROP VIEW v +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.analysis.NoSuchTableException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_1115", + "messageParameters" : { + "msg" : "Table spark_catalog.default.v not found" + } +} + + +-- !query +SELECT INTERVAL "1" YEAR +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42000", + "messageParameters" : { + "error" : "'\"1\"'", + "hint" : "" + } +} + + +-- !query +SELECT 'hello' +-- !query schema +struct +-- !query output +hello + + +-- !query +CREATE TEMPORARY VIEW v(c1 COMMENT 'hello') AS SELECT 1 +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP VIEW v +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT INTERVAL '1' YEAR +-- !query schema +struct +-- !query output +1-0 + + +-- !query +CREATE SCHEMA "myschema" +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW "myview"("c1") AS + WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v" +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT "a1" AS "a2" FROM "myview" AS "atab"("a1") +-- !query schema +struct +-- !query output +1 + + +-- !query +DROP TABLE "myview" +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP SCHEMA "myschema" +-- !query schema +struct<> +-- !query output + From 505a8a04ad64a6732bf9fec03c28bfbd514d109d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 4 Oct 2022 17:26:34 -0700 Subject: [PATCH 4/9] [SPARK-40428][CORE] Fix shutdown hook in the CoarseGrainedSchedulerBackend ### What changes were proposed in this pull request? Fix the shutdown hook call through to CoarseGrainedSchedulerBackend ### Why are the changes needed? Sometimes if the driver shuts down abnormally resources may be left dangling. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #37885 from holdenk/shutdownhook-for-k8s. Lead-authored-by: Holden Karau Co-authored-by: Holden Karau Signed-off-by: Holden Karau --- .../scala/org/apache/spark/SparkContext.scala | 4 +++- .../apache/spark/scheduler/DAGScheduler.scala | 16 ++++++++++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 16 ++++++++++++---- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f101dc8e083f4..e2c6a912bc270 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2119,7 +2119,9 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _plugins.foreach(_.shutdown()) } - FallbackStorage.cleanUp(_conf, _hadoopConfiguration) + Utils.tryLogNonFatalError { + FallbackStorage.cleanUp(_conf, _hadoopConfiguration) + } Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ab05409aebb5a..c55298513824c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2890,10 +2890,18 @@ private[spark] class DAGScheduler( } def stop(): Unit = { - messageScheduler.shutdownNow() - shuffleMergeFinalizeScheduler.shutdownNow() - eventProcessLoop.stop() - taskScheduler.stop() + Utils.tryLogNonFatalError { + messageScheduler.shutdownNow() + } + Utils.tryLogNonFatalError { + shuffleMergeFinalizeScheduler.shutdownNow() + } + Utils.tryLogNonFatalError { + eventProcessLoop.stop() + } + Utils.tryLogNonFatalError { + taskScheduler.stop() + } } eventProcessLoop.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a6735f380f18e..5004262a71c1c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -972,15 +972,23 @@ private[spark] class TaskSchedulerImpl( } override def stop(): Unit = { - speculationScheduler.shutdown() + Utils.tryLogNonFatalError { + speculationScheduler.shutdown() + } if (backend != null) { - backend.stop() + Utils.tryLogNonFatalError { + backend.stop() + } } if (taskResultGetter != null) { - taskResultGetter.stop() + Utils.tryLogNonFatalError { + taskResultGetter.stop() + } } if (barrierCoordinator != null) { - barrierCoordinator.stop() + Utils.tryLogNonFatalError { + barrierCoordinator.stop() + } } starvationTimer.cancel() abortTimer.cancel() From 99a76a62ea33dcfabc638bb2e9c3c8e7d5844862 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Wed, 5 Oct 2022 11:23:26 +0900 Subject: [PATCH 5/9] [SPARK-40645][CONNECT] Throw exception for Collect() and recommend to use toPandas() ### What changes were proposed in this pull request? Current connect `Collect()` return Pandas DataFrame, which does not match with PySpark DataFrame API which returns a `List[Row]`: https://github.com/apache/spark/blob/ceb8527413288b4d5c54d3afd76d00c9e26817a1/python/pyspark/sql/connect/data_frame.py#L227 https://github.com/apache/spark/blob/ceb8527413288b4d5c54d3afd76d00c9e26817a1/python/pyspark/sql/dataframe.py#L1119 The underlying implementation has been generating Pandas DataFrame though. In this case, we can choose to use to `toPandas()` and throw exception for `Collect()` to recommend to use `toPandas()`. ### Why are the changes needed? The goal of the connect project is still to align with existing data frame API as much as possible. In this case, given that `Collect()` is not compatible in existing python client, we can choose to disable it for now. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38089 from amaliujia/SPARK-40645. Lead-authored-by: Rui Wang Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client.py | 2 +- python/pyspark/sql/connect/data_frame.py | 10 ++++++---- python/pyspark/sql/tests/connect/test_spark_connect.py | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index df42411d42cb8..7d080761c46e1 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -141,7 +141,7 @@ def _build_metrics(self, metrics: "pb2.Response.Metrics") -> typing.List[PlanMet def sql(self, sql_string: str) -> "DataFrame": return DataFrame.withPlan(SQL(sql_string), self) - def collect(self, plan: pb2.Plan) -> pandas.DataFrame: + def _to_pandas(self, plan: pb2.Plan) -> pandas.DataFrame: req = pb2.Request() req.user_context.user_id = self._user_id req.plan.CopyFrom(plan) diff --git a/python/pyspark/sql/connect/data_frame.py b/python/pyspark/sql/connect/data_frame.py index b229cad198035..2ba11628a06f4 100644 --- a/python/pyspark/sql/connect/data_frame.py +++ b/python/pyspark/sql/connect/data_frame.py @@ -27,6 +27,8 @@ TYPE_CHECKING, ) +import pandas + import pyspark.sql.connect.plan as plan from pyspark.sql.connect.column import ( ColumnOrString, @@ -225,11 +227,11 @@ def _print_plan(self) -> str: return "" def collect(self): - query = self._plan.collect(self._session) - return self._session.collect(query) + raise NotImplementedError("Please use toPandas().") - def toPandas(self): - return self.collect() + def toPandas(self) -> pandas.DataFrame: + query = self._plan.collect(self._session) + return self._session._to_pandas(query) def explain(self) -> str: query = self._plan.collect(self._session) diff --git a/python/pyspark/sql/tests/connect/test_spark_connect.py b/python/pyspark/sql/tests/connect/test_spark_connect.py index cac17b7397dcb..7e891c5cf19f8 100644 --- a/python/pyspark/sql/tests/connect/test_spark_connect.py +++ b/python/pyspark/sql/tests/connect/test_spark_connect.py @@ -57,7 +57,7 @@ class SparkConnectTests(SparkConnectSQLTestCase): def test_simple_read(self) -> None: """Tests that we can access the Spark Connect GRPC service locally.""" df = self.connect.read.table(self.tbl_name) - data = df.limit(10).collect() + data = df.limit(10).toPandas() # Check that the limit is applied assert len(data.index) == 10 @@ -67,7 +67,7 @@ def conv_udf(x) -> str: u = udf(conv_udf) df = self.connect.read.table(self.tbl_name) - result = df.select(u(df.id)).collect() + result = df.select(u(df.id)).toPandas() assert result is not None def test_simple_explain_string(self) -> None: From ccb837f7fbea36dee3e2df2f8ff84f4947e2e513 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Wed, 5 Oct 2022 13:58:08 +0800 Subject: [PATCH 6/9] [SPARK-40587][CONNECT] Support SELECT * in an explicit way in connect proto ### What changes were proposed in this pull request? Support `SELECT *` in an explicit way by connect proto. ### Why are the changes needed? Current proto uses empty project list for `SELECT *`. However, this is an implicit way that it is hard to differentiate `not set` and `set but empty` (the latter is invalid plan). For longer term proto compatibility, we should always use explicit fields for passing through information. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38023 from amaliujia/SPARK-40587. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- .../protobuf/spark/connect/expressions.proto | 4 ++++ .../connect/planner/SparkConnectPlanner.scala | 4 +++- .../planner/SparkConnectPlannerSuite.scala | 17 +++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/connect/src/main/protobuf/spark/connect/expressions.proto b/connect/src/main/protobuf/spark/connect/expressions.proto index 6b72a646623c7..74df0cc2f2141 100644 --- a/connect/src/main/protobuf/spark/connect/expressions.proto +++ b/connect/src/main/protobuf/spark/connect/expressions.proto @@ -34,6 +34,7 @@ message Expression { UnresolvedAttribute unresolved_attribute = 2; UnresolvedFunction unresolved_function = 3; ExpressionString expression_string = 4; + UnresolvedStar unresolved_star = 5; } message Literal { @@ -155,4 +156,7 @@ message Expression { string expression = 1; } + // UnresolvedStar is used to expand all the fields of a relation or struct. + message UnresolvedStar { + } } diff --git a/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index ee2e05a1e606a..be1d5240ad43f 100644 --- a/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -96,7 +96,9 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { rel: proto.Project, common: Option[proto.RelationCommon]): LogicalPlan = { val baseRel = transformRelation(rel.getInput) - val projection = if (rel.getExpressionsCount == 0) { + // TODO: support the target field for *. + val projection = + if (rel.getExpressionsCount == 1 && rel.getExpressions(0).hasUnresolvedStar) { Seq(UnresolvedStar(Option.empty)) } else { rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_)) diff --git a/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index f24b09a8dd420..e1a658fb57b27 100644 --- a/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.Expression.UnresolvedStar import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -91,6 +92,22 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { assert(res.nodeName == "UnresolvedRelation") } + test("Simple Project") { + val readWithTable = proto.Read.newBuilder() + .setNamedTable(proto.Read.NamedTable.newBuilder.addParts("name").build()) + .build() + val project = + proto.Project.newBuilder() + .setInput(proto.Relation.newBuilder().setRead(readWithTable).build()) + .addExpressions( + proto.Expression.newBuilder() + .setUnresolvedStar(UnresolvedStar.newBuilder().build()).build() + ).build() + val res = transform(proto.Relation.newBuilder.setProject(project).build()) + assert(res !== null) + assert(res.nodeName == "Project") + } + test("Simple Sort") { val sort = proto.Sort.newBuilder .addAllSortFields(Seq(proto.Sort.SortField.newBuilder().build()).asJava) From e6bebb66651a1ff06f821bd4ee2b7b52bd532c01 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 5 Oct 2022 14:00:55 +0800 Subject: [PATCH 7/9] [SPARK-40660][CORE][SQL] Switch to XORShiftRandom to distribute elements ### What changes were proposed in this pull request? This PR replaces `Random(hashing.byteswap32(index))` with `XORShiftRandom(index)` to distribute elements evenly across output partitions. ### Why are the changes needed? It seems that the distribution using `XORShiftRandom` is better. For example: 1. The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test. ``` bin/spark-shell --master "local[2]" spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition") ``` Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before SPARK-40407, the number of output files is 10. 2. The distribution using `XORShiftRandom` seem better. ```scala import java.util.Random import org.apache.spark.util.random.XORShiftRandom import scala.util.hashing def distribution(count: Int, partition: Int) = { println((1 to count).map(partitionId => new Random(partitionId).nextInt(partition)) .groupBy(f => f) .map(_._2.size).mkString(". ")) println((1 to count).map(partitionId => new Random(hashing.byteswap32(partitionId)).nextInt(partition)) .groupBy(f => f) .map(_._2.size).mkString(". ")) println((1 to count).map(partitionId => new XORShiftRandom(partitionId).nextInt(partition)) .groupBy(f => f) .map(_._2.size).mkString(". ")) } distribution(200, 4) ``` The output: ``` 200 50. 60. 46. 44 55. 48. 43. 54 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #38106 from wangyum/SPARK-40660. Authored-by: Yuming Wang Signed-off-by: Wenchen Fan --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++--- .../execution/exchange/ShuffleExchangeExec.scala | 5 ++--- .../scala/org/apache/spark/sql/DatasetSuite.scala | 14 +++++++++++++- .../adaptive/AdaptiveQueryExecSuite.scala | 4 ++-- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d12804fc12b46..18f3f87f30fd2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -25,7 +25,6 @@ import scala.io.Codec import scala.language.implicitConversions import scala.ref.WeakReference import scala.reflect.{classTag, ClassTag} -import scala.util.hashing import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} @@ -50,7 +49,7 @@ import org.apache.spark.util.Utils import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, - SamplingUtils} + SamplingUtils, XORShiftRandom} /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -505,7 +504,7 @@ abstract class RDD[T: ClassTag]( if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { - var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) + var position = new XORShiftRandom(index).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 6f287028f7401..806a048b244ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.execution.exchange -import java.util.Random import java.util.function.Supplier import scala.concurrent.Future -import scala.util.hashing import org.apache.spark._ import org.apache.spark.internal.config @@ -40,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} +import org.apache.spark.util.random.XORShiftRandom /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -307,7 +306,7 @@ object ShuffleExchangeExec { // end up being almost the same regardless of the index. substantially scrambling the // seed by hashing will help. Refer to SPARK-21782 for more details. val partitionId = TaskContext.get().partitionId() - var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions) + var position = new XORShiftRandom(partitionId).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7dc44b76e030a..48b434fbb3abc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ @@ -2169,7 +2170,18 @@ class DatasetSuite extends QueryTest test("SPARK-40407: repartition should not result in severe data skew") { val df = spark.range(0, 100, 1, 50).repartition(4) val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() - assert(result.sorted.toSeq === Seq(19, 25, 25, 31)) + assert(result.sorted.toSeq === Seq(23, 25, 25, 27)) + } + + test("SPARK-40660: Switch to XORShiftRandom to distribute elements") { + withTempDir { dir => + spark.range(10).repartition(10).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + val fs = new Path(dir.getAbsolutePath).getFileSystem(spark.sessionState.newHadoopConf()) + val parquetFiles = fs.listStatus(new Path(dir.getAbsolutePath), new PathFilter { + override def accept(path: Path): Boolean = path.getName.endsWith("parquet") + }) + assert(parquetFiles.size === 10) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fbaa6fea60bf9..81bce35a58451 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2127,8 +2127,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { // partition size [0,258,72,72,72] checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) - // partition size [144,72,144,216,144] - checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6) + // partition size [144,72,144,72,72,144,72] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7) } // no skewed partition should be optimized From 5600bef0ee6149ebc1abcf4c9c9b2991553ca3de Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 5 Oct 2022 17:40:42 +0900 Subject: [PATCH 8/9] [SPARK-40635][YARN][TESTS] Fix `yarn` module daily test failed with `hadoop2` ### What changes were proposed in this pull request? This pr adds the de duplication process in the `yarn.Client#populateClasspath` method when `Utils.isTesting` is true to ensure that `ENV_DIST_CLASSPATH` will only add the part of `extraClassPath` that does not exist to `CLASSPATH` to avoid `java.io.IOException: error=7, Argument list too long` in this way. ### Why are the changes needed? Fix daily test failed of yarn module with `-Phadoop-2`. [Daily test failed](https://github.com/apache/spark/actions/runs/3174476348/jobs/5171331515) as follows: ``` Exception message: Cannot run program "bash" (in directory "/home/runner/work/spark/spark/resource-managers/yarn/target/org.apache.spark.deploy.yarn.YarnClusterSuite/org.apache.spark.deploy.yarn.YarnClusterSuite-localDir-nm-0_0/usercache/runner/appcache/application_1664721938509_0027/container_1664721938509_0027_02_000001"): error=7, Argument list too long 22096[info] Stack trace: java.io.IOException: Cannot run program "bash" (in directory "/home/runner/work/spark/spark/resource-managers/yarn/target/org.apache.spark.deploy.yarn.YarnClusterSuite/org.apache.spark.deploy.yarn.YarnClusterSuite-localDir-nm-0_0/usercache/runner/appcache/application_1664721938509_0027/container_1664721938509_0027_02_000001"): error=7, Argument list too long 22097[info] at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) 22098[info] at org.apache.hadoop.util.Shell.runCommand(Shell.java:526) 22099[info] at org.apache.hadoop.util.Shell.run(Shell.java:482) 22100[info] at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776) 22101[info] at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) 22102[info] at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 22103[info] at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 22104[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 22105[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 22106[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 22107[info] at java.lang.Thread.run(Thread.java:750) 22108[info] Caused by: java.io.IOException: error=7, Argument list too long 22109[info] at java.lang.UNIXProcess.forkAndExec(Native Method) 22110[info] at java.lang.UNIXProcess.(UNIXProcess.java:247) 22111[info] at java.lang.ProcessImpl.start(ProcessImpl.java:134) 22112[info] at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) 22113[info] ... 10 more 22114[info] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Verify test with `hadoop2` is successful: https://github.com/LuciferYang/spark/actions/runs/3175111616/jobs/5172833416 Closes #38079 from LuciferYang/SPARK-40635. Authored-by: yangjie01 Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 46c3117a14b9b..2daf9cd1df432 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1470,6 +1470,11 @@ private[spark] object Client extends Logging { addClasspathEntry(getClusterPath(sparkConf, cp), env) } + val cpSet = extraClassPath match { + case Some(classPath) if Utils.isTesting => classPath.split(File.pathSeparator).toSet + case _ => Set.empty[String] + } + addClasspathEntry(Environment.PWD.$$(), env) addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env) @@ -1513,7 +1518,13 @@ private[spark] object Client extends Logging { } sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => - addClasspathEntry(getClusterPath(sparkConf, cp), env) + // SPARK-40635: during the test, add a jar de-duplication process to avoid + // that the startup command can't be executed due to the too long classpath. + val newCp = if (Utils.isTesting) { + cp.split(File.pathSeparator) + .filterNot(cpSet.contains).mkString(File.pathSeparator) + } else cp + addClasspathEntry(getClusterPath(sparkConf, newCp), env) } // Add the localized Hadoop config at the end of the classpath, in case it contains other From 1e305643b3b4adfbfdab8ed181f55ea2e6b74f0d Mon Sep 17 00:00:00 2001 From: itholic Date: Wed, 5 Oct 2022 13:14:19 +0300 Subject: [PATCH 9/9] [SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGACY_ERROR_TEMP_2000-2025 ### What changes were proposed in this pull request? This PR proposes to migrate 26 execution errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_2000` to `_LEGACY_ERROR_TEMP_2024`. The error classes are prefixed with `_LEGACY_ERROR_TEMP_` indicates the dev-facing error messages, and won't be exposed to end users. ### Why are the changes needed? To speed-up the error class migration. The migration on temporary error classes allow us to analyze the errors, so we can detect the most popular error classes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "test:testOnly *SQLQuerySuite" $ build/sbt -Phadoop-3 -Phive-thriftserver catalyst/test hive-thriftserver/test ``` Closes #38104 from itholic/SPARK-40540-2000. Authored-by: itholic Signed-off-by: Max Gekk --- .../main/resources/error/error-classes.json | 130 ++++++++++++ .../sql/errors/QueryExecutionErrors.scala | 186 ++++++++++++------ .../aggregate/PercentileSuite.scala | 18 +- .../sql-tests/results/ansi/date.sql.out | 29 ++- .../sql-tests/results/ansi/timestamp.sql.out | 30 ++- .../sql-tests/results/postgreSQL/date.sql.out | 30 ++- .../results/regexp-functions.sql.out | 104 +++++++--- .../timestampNTZ/timestamp-ansi.sql.out | 30 ++- 8 files changed, 438 insertions(+), 119 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 653c4a6938f8d..d27a2cbde9728 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -3002,5 +3002,135 @@ "message" : [ "Failed to execute command because subquery expressions are not allowed in DEFAULT values." ] + }, + "_LEGACY_ERROR_TEMP_2000" : { + "message" : [ + ". If necessary set to false to bypass this error." + ] + }, + "_LEGACY_ERROR_TEMP_2001" : { + "message" : [ + " If necessary set to false to bypass this error" + ] + }, + "_LEGACY_ERROR_TEMP_2002" : { + "message" : [ + "" + ] + }, + "_LEGACY_ERROR_TEMP_2003" : { + "message" : [ + "Unsuccessful try to zip maps with unique keys due to exceeding the array size limit " + ] + }, + "_LEGACY_ERROR_TEMP_2004" : { + "message" : [ + "no default for type " + ] + }, + "_LEGACY_ERROR_TEMP_2005" : { + "message" : [ + "Type does not support ordered operations" + ] + }, + "_LEGACY_ERROR_TEMP_2006" : { + "message" : [ + "The specified group index cannot be less than zero" + ] + }, + "_LEGACY_ERROR_TEMP_2007" : { + "message" : [ + "Regex group count is , but the specified group index is " + ] + }, + "_LEGACY_ERROR_TEMP_2008" : { + "message" : [ + "Find an invalid url string . If necessary set to false to bypass this error." + ] + }, + "_LEGACY_ERROR_TEMP_2009" : { + "message" : [ + "dataType" + ] + }, + "_LEGACY_ERROR_TEMP_2010" : { + "message" : [ + "Window Functions do not support merging." + ] + }, + "_LEGACY_ERROR_TEMP_2011" : { + "message" : [ + "Unexpected data type " + ] + }, + "_LEGACY_ERROR_TEMP_2012" : { + "message" : [ + "Unexpected type " + ] + }, + "_LEGACY_ERROR_TEMP_2013" : { + "message" : [ + "Negative values found in " + ] + }, + "_LEGACY_ERROR_TEMP_2014" : { + "message" : [ + " is not matched at addNewFunction" + ] + }, + "_LEGACY_ERROR_TEMP_2015" : { + "message" : [ + "Cannot generate code for incomparable type: " + ] + }, + "_LEGACY_ERROR_TEMP_2016" : { + "message" : [ + "Can not interpolate into code block." + ] + }, + "_LEGACY_ERROR_TEMP_2017" : { + "message" : [ + "not resolved" + ] + }, + "_LEGACY_ERROR_TEMP_2018" : { + "message" : [ + "class `` is not supported by `MapObjects` as resulting collection." + ] + }, + "_LEGACY_ERROR_TEMP_2019" : { + "message" : [ + "Cannot use null as map key!" + ] + }, + "_LEGACY_ERROR_TEMP_2020" : { + "message" : [ + "Couldn't find a valid constructor on " + ] + }, + "_LEGACY_ERROR_TEMP_2021" : { + "message" : [ + "Couldn't find a primary constructor on " + ] + }, + "_LEGACY_ERROR_TEMP_2022" : { + "message" : [ + "Unsupported natural join type " + ] + }, + "_LEGACY_ERROR_TEMP_2023" : { + "message" : [ + "Unresolved encoder expected, but was found." + ] + }, + "_LEGACY_ERROR_TEMP_2024" : { + "message" : [ + "Only expression encoders are supported for now." + ] + }, + "_LEGACY_ERROR_TEMP_2025" : { + "message" : [ + " must override either or " + ] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 7c7561b3a71c3..5244a5283cdb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -286,20 +286,29 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { summary = "") } - def ansiDateTimeError(e: DateTimeException): DateTimeException = { - val newMessage = s"${e.getMessage}. " + - s"If necessary set ${SQLConf.ANSI_ENABLED.key} to false to bypass this error." - new DateTimeException(newMessage, e.getCause) + def ansiDateTimeError(e: Exception): SparkDateTimeException = { + new SparkDateTimeException( + errorClass = "_LEGACY_ERROR_TEMP_2000", + errorSubClass = None, + messageParameters = Map( + "message" -> e.getMessage, + "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)), + context = Array.empty, + summary = "") } - def ansiIllegalArgumentError(message: String): IllegalArgumentException = { - val newMessage = s"$message. If necessary set ${SQLConf.ANSI_ENABLED.key} " + - s"to false to bypass this error." - new IllegalArgumentException(newMessage) + def ansiIllegalArgumentError(message: String): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2001", + messageParameters = Map( + "message" -> message, + "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key))) } - def ansiIllegalArgumentError(e: IllegalArgumentException): IllegalArgumentException = { - ansiIllegalArgumentError(e.getMessage) + def ansiIllegalArgumentError(e: Exception): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2002", + messageParameters = Map("message" -> e.getMessage)) } def overflowInSumOfDecimalError(context: SQLQueryContext): ArithmeticException = { @@ -310,10 +319,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { arithmeticOverflowError("Overflow in integral divide", "try_divide", context) } - def mapSizeExceedArraySizeWhenZipMapError(size: Int): RuntimeException = { - new RuntimeException(s"Unsuccessful try to zip maps with $size " + - "unique keys due to exceeding the array size limit " + - s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + def mapSizeExceedArraySizeWhenZipMapError(size: Int): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2003", + messageParameters = Map( + "size" -> size.toString(), + "maxRoundedArrayLength" -> ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH.toString())) } def literalTypeUnsupportedError(v: Any): RuntimeException = { @@ -334,27 +345,41 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "type" -> toSQLType(dataType))) } - def noDefaultForDataTypeError(dataType: DataType): RuntimeException = { - new RuntimeException(s"no default for type $dataType") + def noDefaultForDataTypeError(dataType: DataType): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2004", + messageParameters = Map("dataType" -> dataType.toString())) } - def orderedOperationUnsupportedByDataTypeError(dataType: DataType): Throwable = { - new IllegalArgumentException(s"Type $dataType does not support ordered operations") + def orderedOperationUnsupportedByDataTypeError( + dataType: DataType): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2005", + errorSubClass = None, + messageParameters = Map("dataType" -> dataType.toString())) } - def regexGroupIndexLessThanZeroError(): Throwable = { - new IllegalArgumentException("The specified group index cannot be less than zero") + def regexGroupIndexLessThanZeroError(): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2006", + messageParameters = Map.empty) } def regexGroupIndexExceedGroupCountError( - groupCount: Int, groupIndex: Int): Throwable = { - new IllegalArgumentException( - s"Regex group count is $groupCount, but the specified group index is $groupIndex") + groupCount: Int, groupIndex: Int): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2007", + messageParameters = Map( + "groupCount" -> groupCount.toString(), + "groupIndex" -> groupIndex.toString())) } - def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = { - new IllegalArgumentException(s"Find an invalid url string ${url.toString}. " + - s"If necessary set ${SQLConf.ANSI_ENABLED.key} to false to bypass this error.", e) + def invalidUrlError(url: UTF8String, e: URISyntaxException): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2008", + messageParameters = Map( + "url" -> url.toString, + "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key))) } def illegalUrlError(url: UTF8String): Throwable = { @@ -364,52 +389,74 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { ) } - def dataTypeOperationUnsupportedError(): Throwable = { - new UnsupportedOperationException("dataType") + def dataTypeOperationUnsupportedError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2009", + messageParameters = Map.empty) } - def mergeUnsupportedByWindowFunctionError(): Throwable = { - new UnsupportedOperationException("Window Functions do not support merging.") + def mergeUnsupportedByWindowFunctionError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2010", + messageParameters = Map.empty) } - def dataTypeUnexpectedError(dataType: DataType): Throwable = { - new UnsupportedOperationException(s"Unexpected data type ${dataType.catalogString}") + def dataTypeUnexpectedError(dataType: DataType): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2011", + messageParameters = Map("dataType" -> dataType.catalogString)) } - def typeUnsupportedError(dataType: DataType): Throwable = { - new IllegalArgumentException(s"Unexpected type $dataType") + def typeUnsupportedError(dataType: DataType): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2012", + messageParameters = Map("dataType" -> dataType.toString())) } - def negativeValueUnexpectedError(frequencyExpression : Expression): Throwable = { - new SparkException(s"Negative values found in ${frequencyExpression.sql}") + def negativeValueUnexpectedError( + frequencyExpression : Expression): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2013", + messageParameters = Map("frequencyExpression" -> frequencyExpression.sql)) } - def addNewFunctionMismatchedWithFunctionError(funcName: String): Throwable = { - new IllegalArgumentException(s"$funcName is not matched at addNewFunction") + def addNewFunctionMismatchedWithFunctionError(funcName: String): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2014", + messageParameters = Map("funcName" -> funcName)) } def cannotGenerateCodeForIncomparableTypeError( - codeType: String, dataType: DataType): Throwable = { - new IllegalArgumentException( - s"Cannot generate $codeType code for incomparable type: ${dataType.catalogString}") + codeType: String, dataType: DataType): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2015", + messageParameters = Map( + "codeType" -> codeType, + "dataType" -> dataType.catalogString)) } - def cannotInterpolateClassIntoCodeBlockError(arg: Any): Throwable = { - new IllegalArgumentException( - s"Can not interpolate ${arg.getClass.getName} into code block.") + def cannotInterpolateClassIntoCodeBlockError(arg: Any): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2016", + messageParameters = Map("arg" -> arg.getClass.getName)) } - def customCollectionClsNotResolvedError(): Throwable = { - new UnsupportedOperationException("not resolved") + def customCollectionClsNotResolvedError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2017", + messageParameters = Map.empty) } - def classUnsupportedByMapObjectsError(cls: Class[_]): RuntimeException = { - new RuntimeException(s"class `${cls.getName}` is not supported by `MapObjects` as " + - "resulting collection.") + def classUnsupportedByMapObjectsError(cls: Class[_]): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2018", + messageParameters = Map("cls" -> cls.getName)) } - def nullAsMapKeyNotAllowedError(): RuntimeException = { - new RuntimeException("Cannot use null as map key!") + def nullAsMapKeyNotAllowedError(): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2019", + messageParameters = Map.empty) } def methodNotDeclaredError(name: String): Throwable = { @@ -417,28 +464,41 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { s"""A method named "$name" is not declared in any enclosing class nor any supertype""") } - def constructorNotFoundError(cls: String): Throwable = { - new RuntimeException(s"Couldn't find a valid constructor on $cls") + def constructorNotFoundError(cls: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2020", + messageParameters = Map("cls" -> cls.toString())) } - def primaryConstructorNotFoundError(cls: Class[_]): Throwable = { - new RuntimeException(s"Couldn't find a primary constructor on $cls") + def primaryConstructorNotFoundError(cls: Class[_]): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2021", + messageParameters = Map("cls" -> cls.toString())) } - def unsupportedNaturalJoinTypeError(joinType: JoinType): Throwable = { - new RuntimeException("Unsupported natural join type " + joinType) + def unsupportedNaturalJoinTypeError(joinType: JoinType): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2022", + messageParameters = Map("joinType" -> joinType.toString())) } - def notExpectedUnresolvedEncoderError(attr: AttributeReference): Throwable = { - new RuntimeException(s"Unresolved encoder expected, but $attr was found.") + def notExpectedUnresolvedEncoderError(attr: AttributeReference): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2023", + messageParameters = Map("attr" -> attr.toString())) } - def unsupportedEncoderError(): Throwable = { - new RuntimeException("Only expression encoders are supported for now.") + def unsupportedEncoderError(): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2024", + messageParameters = Map.empty) } - def notOverrideExpectedMethodsError(className: String, m1: String, m2: String): Throwable = { - new RuntimeException(s"$className must override either $m1 or $m2") + def notOverrideExpectedMethodsError( + className: String, m1: String, m2: String): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "_LEGACY_ERROR_TEMP_2025", + messageParameters = Map("className" -> className, "m1" -> m1, "m2" -> m2)) } def failToConvertValueToJsonError(value: AnyRef, cls: Class[_], dataType: DataType): Throwable = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala index 5f7abbdfa0ad5..61ccaefb270a2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.catalyst.expressions.aggregate -import org.apache.spark.SparkException import org.apache.spark.SparkFunSuite +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -321,13 +321,15 @@ class PercentileSuite extends SparkFunSuite { val buffer = new GenericInternalRow(new Array[Any](2)) agg.initialize(buffer) - val caught = - intercept[SparkException]{ - // Add some non-empty row with negative frequency - agg.update(buffer, InternalRow(1, -5)) - agg.eval(buffer) - } - assert(caught.getMessage.startsWith("Negative values found in ")) + checkError( + exception = + intercept[SparkIllegalArgumentException]{ + // Add some non-empty row with negative frequency + agg.update(buffer, InternalRow(1, -5)) + agg.eval(buffer) + }, + errorClass = "_LEGACY_ERROR_TEMP_2013", + parameters = Map("frequencyExpression" -> "CAST(boundreference() AS INT)")) } private def compareEquals( diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out index 3c16587188933..f8e88cdba0b9f 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out @@ -50,8 +50,14 @@ select make_date(2000, 13, 1) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for MonthOfYear (valid values 1 - 12): 13. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for MonthOfYear (valid values 1 - 12): 13" + } +} -- !query @@ -59,8 +65,14 @@ select make_date(2000, 1, 33) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for DayOfMonth (valid values 1 - 28/31): 33. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for DayOfMonth (valid values 1 - 28/31): 33" + } +} -- !query @@ -226,8 +238,13 @@ select next_day("2015-07-23", "xx") -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Illegal input for day of week: xx. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2002", + "messageParameters" : { + "message" : "Illegal input for day of week: xx" + } +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/timestamp.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/timestamp.sql.out index c80256d0a8b57..59137f5ea723f 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/timestamp.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/timestamp.sql.out @@ -149,8 +149,14 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 61) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for SecondOfMinute (valid values 0 - 59): 61. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for SecondOfMinute (valid values 0 - 59): 61" + } +} -- !query @@ -174,8 +180,14 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 99.999999) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for SecondOfMinute (valid values 0 - 59): 99. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for SecondOfMinute (valid values 0 - 59): 99" + } +} -- !query @@ -183,8 +195,14 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 999.999999) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for SecondOfMinute (valid values 0 - 59): 999. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for SecondOfMinute (valid values 0 - 59): 999" + } +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out index 73ac127b9e056..1103aff05d808 100755 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out @@ -673,8 +673,14 @@ select make_date(2013, 2, 30) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid date 'FEBRUARY 30'. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid date 'FEBRUARY 30'" + } +} -- !query @@ -682,8 +688,14 @@ select make_date(2013, 13, 1) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for MonthOfYear (valid values 1 - 12): 13. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for MonthOfYear (valid values 1 - 12): 13" + } +} -- !query @@ -691,8 +703,14 @@ select make_date(2013, 11, -1) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for DayOfMonth (valid values 1 - 28/31): -1. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for DayOfMonth (valid values 1 - 28/31): -1" + } +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out index 9ef1802738a1f..adeea49a3e37a 100644 --- a/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/regexp-functions.sql.out @@ -4,8 +4,14 @@ SELECT regexp_extract('1a 2b 14m', '\\d+') -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 0, but the specified group index is 1 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "0", + "groupIndex" : "1" + } +} -- !query @@ -21,8 +27,14 @@ SELECT regexp_extract('1a 2b 14m', '\\d+', 1) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 0, but the specified group index is 1 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "0", + "groupIndex" : "1" + } +} -- !query @@ -30,8 +42,14 @@ SELECT regexp_extract('1a 2b 14m', '\\d+', 2) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 0, but the specified group index is 2 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "0", + "groupIndex" : "2" + } +} -- !query @@ -39,8 +57,10 @@ SELECT regexp_extract('1a 2b 14m', '\\d+', -1) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -The specified group index cannot be less than zero +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2006" +} -- !query @@ -96,8 +116,14 @@ SELECT regexp_extract('1a 2b 14m', '(\\d+)([a-z]+)', 3) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 2, but the specified group index is 3 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "2", + "groupIndex" : "3" + } +} -- !query @@ -105,8 +131,10 @@ SELECT regexp_extract('1a 2b 14m', '(\\d+)([a-z]+)', -1) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -The specified group index cannot be less than zero +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2006" +} -- !query @@ -147,8 +175,14 @@ SELECT regexp_extract_all('1a 2b 14m', '\\d+') -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 0, but the specified group index is 1 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "0", + "groupIndex" : "1" + } +} -- !query @@ -164,8 +198,14 @@ SELECT regexp_extract_all('1a 2b 14m', '\\d+', 1) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 0, but the specified group index is 1 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "0", + "groupIndex" : "1" + } +} -- !query @@ -173,8 +213,14 @@ SELECT regexp_extract_all('1a 2b 14m', '\\d+', 2) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 0, but the specified group index is 2 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "0", + "groupIndex" : "2" + } +} -- !query @@ -182,8 +228,10 @@ SELECT regexp_extract_all('1a 2b 14m', '\\d+', -1) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -The specified group index cannot be less than zero +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2006" +} -- !query @@ -239,8 +287,14 @@ SELECT regexp_extract_all('1a 2b 14m', '(\\d+)([a-z]+)', 3) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -Regex group count is 2, but the specified group index is 3 +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2007", + "messageParameters" : { + "groupCount" : "2", + "groupIndex" : "3" + } +} -- !query @@ -248,8 +302,10 @@ SELECT regexp_extract_all('1a 2b 14m', '(\\d+)([a-z]+)', -1) -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -The specified group index cannot be less than zero +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2006" +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out index ea3332f1e5639..51936c99ef4e6 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestampNTZ/timestamp-ansi.sql.out @@ -149,8 +149,14 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 61) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for SecondOfMinute (valid values 0 - 59): 61. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for SecondOfMinute (valid values 0 - 59): 61" + } +} -- !query @@ -174,8 +180,14 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 99.999999) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for SecondOfMinute (valid values 0 - 59): 99. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for SecondOfMinute (valid values 0 - 59): 99" + } +} -- !query @@ -183,8 +195,14 @@ SELECT make_timestamp(1, 1, 1, 1, 1, 999.999999) -- !query schema struct<> -- !query output -java.time.DateTimeException -Invalid value for SecondOfMinute (valid values 0 - 59): 999. If necessary set spark.sql.ansi.enabled to false to bypass this error. +org.apache.spark.SparkDateTimeException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2000", + "messageParameters" : { + "ansiConfig" : "\"spark.sql.ansi.enabled\"", + "message" : "Invalid value for SecondOfMinute (valid values 0 - 59): 999" + } +} -- !query