-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5213] [SQL] Pluggable SQL Parser Support #5827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
49b9d81
aab0b0b
81a731f
493775c
c2895cf
c19780b
0878bd1
81b9737
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
|
|
||
| /** | ||
| * Root class of SQL Parser Dialect, and we don't guarantee the binary | ||
| * compatibility for the future release, let's keep it as the internal | ||
| * interface for advanced user. | ||
| * | ||
| */ | ||
| @DeveloperApi | ||
| abstract class Dialect { | ||
| // this is the main function that will be implemented by sql parser. | ||
| def parse(sqlText: String): LogicalPlan | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._ | |
| import scala.collection.immutable | ||
| import scala.language.implicitConversions | ||
| import scala.reflect.runtime.universe.TypeTag | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import com.google.common.reflect.TypeToken | ||
|
|
||
|
|
@@ -32,9 +33,11 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} | |
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.analysis._ | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.errors.DialectException | ||
| import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} | ||
| import org.apache.spark.sql.catalyst.rules.RuleExecutor | ||
| import org.apache.spark.sql.catalyst.Dialect | ||
| import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions} | ||
| import org.apache.spark.sql.execution.{Filter, _} | ||
| import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} | ||
|
|
@@ -44,6 +47,45 @@ import org.apache.spark.sql.types._ | |
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.{Partition, SparkContext} | ||
|
|
||
| /** | ||
| * Currently we support the default dialect named "sql", associated with the class | ||
| * [[DefaultDialect]] | ||
| * | ||
| * And we can also provide custom SQL Dialect, for example in Spark SQL CLI: | ||
| * {{{ | ||
| *-- switch to "hiveql" dialect | ||
| * spark-sql>SET spark.sql.dialect=hiveql; | ||
| * spark-sql>SELECT * FROM src LIMIT 1; | ||
| * | ||
| *-- switch to "sql" dialect | ||
| * spark-sql>SET spark.sql.dialect=sql; | ||
| * spark-sql>SELECT * FROM src LIMIT 1; | ||
| * | ||
| *-- register the new SQL dialect | ||
| * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect; | ||
| * spark-sql> SELECT * FROM src LIMIT 1; | ||
| * | ||
| *-- register the non-exist SQL dialect | ||
| * spark-sql> SET spark.sql.dialect=NotExistedClass; | ||
| * spark-sql> SELECT * FROM src LIMIT 1; | ||
| * | ||
| *-- Exception will be thrown and switch to dialect | ||
| *-- "sql" (for SQLContext) or | ||
| *-- "hiveql" (for HiveContext) | ||
| * }}} | ||
| */ | ||
| private[spark] class DefaultDialect extends Dialect { | ||
| @transient | ||
| protected val sqlParser = { | ||
| val catalystSqlParser = new catalyst.SqlParser | ||
| new SparkSQLParser(catalystSqlParser.parse) | ||
| } | ||
|
|
||
| override def parse(sqlText: String): LogicalPlan = { | ||
| sqlParser.parse(sqlText) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * The entry point for working with structured data (rows and columns) in Spark. Allows the | ||
| * creation of [[DataFrame]] objects as well as the execution of SQL queries. | ||
|
|
@@ -132,17 +174,31 @@ class SQLContext(@transient val sparkContext: SparkContext) | |
| protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer | ||
|
|
||
| @transient | ||
| protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) | ||
| protected[sql] val ddlParser = new DDLParser((sql: String) => { getSQLDialect().parse(sql) }) | ||
|
|
||
| // kept sqlParser to make sure mima tests pass | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this comment? It's probably confused once this PR merged.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
| @transient | ||
| protected[sql] val sqlParser = { | ||
| val fallback = new catalyst.SqlParser | ||
| new SparkSQLParser(fallback.parse(_)) | ||
| protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chenghao-intel this sqlparser actually will not be used for now, place here just to fix mima test
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we'd better keep it, not just for the mima test, but also for the sub class of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree to keep it, and in dialect parser we should not use SparkSQLParser. Dialect give a fallback(string -> logicalplan) and we call it in sqlParser |
||
|
|
||
| protected[sql] def getSQLDialect(): Dialect = { | ||
| try { | ||
| val clazz = Utils.classForName(dialectClassName) | ||
| clazz.newInstance().asInstanceOf[Dialect] | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // Since we didn't find the available SQL Dialect, it will fail even for SET command: | ||
| // SET spark.sql.dialect=sql; Let's reset as default dialect automatically. | ||
| val dialect = conf.dialect | ||
| // reset the sql dialect | ||
| conf.unsetConf(SQLConf.DIALECT) | ||
| // throw out the exception, and the default sql dialect will take effect for next query. | ||
| throw new DialectException( | ||
| s"""Instantiating dialect '$dialect' failed. | ||
| |Reverting to default dialect '${conf.dialect}'""".stripMargin, e) | ||
| } | ||
| } | ||
|
|
||
| protected[sql] def parseSql(sql: String): LogicalPlan = { | ||
| ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql)) | ||
| } | ||
| protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) | ||
|
|
||
| protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) | ||
|
|
||
|
|
@@ -156,6 +212,12 @@ class SQLContext(@transient val sparkContext: SparkContext) | |
| @transient | ||
| protected[sql] val defaultSession = createSession() | ||
|
|
||
| protected[sql] def dialectClassName = if (conf.dialect == "sql") { | ||
| classOf[DefaultDialect].getCanonicalName | ||
| } else { | ||
| conf.dialect | ||
| } | ||
|
|
||
| sparkContext.getConf.getAll.foreach { | ||
| case (key, value) if key.startsWith("spark.sql") => setConf(key, value) | ||
| case _ => | ||
|
|
@@ -945,11 +1007,7 @@ class SQLContext(@transient val sparkContext: SparkContext) | |
| * @group basic | ||
| */ | ||
| def sql(sqlText: String): DataFrame = { | ||
| if (conf.dialect == "sql") { | ||
| DataFrame(this, parseSql(sqlText)) | ||
| } else { | ||
| sys.error(s"Unsupported SQL dialect: ${conf.dialect}") | ||
| } | ||
| DataFrame(this, parseSql(sqlText)) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad,
sqlParsershould just beSqlParser, not theSparkSQLParserThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it should be SparkSQLParser here, otherwise we can not parse cache command
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/spark/pull/5827/files#diff-131c27c6a1f59770d738b11f2a4755ecR180