Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
) ++ Seq(
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.execution.datasources.DDLParser")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,15 @@ descFuncNames
| functionIdentifier
;

looseIdentifier
:
Identifier
| looseNonReserved -> Identifier[$looseNonReserved.text]
// If it decides to support SQL11 reserved keywords, i.e., useSQL11ReservedKeywordsForIdentifier()=false,
// the sql11keywords in existing q tests will NOT be added back.
| {useSQL11ReservedKeywordsForIdentifier()}? sql11ReservedKeywordsUsedAsIdentifier -> Identifier[$sql11ReservedKeywordsUsedAsIdentifier.text]
;

identifier
:
Identifier
Expand All @@ -518,6 +527,10 @@ principalIdentifier
| QuotedIdentifier
;

looseNonReserved
: nonReserved | KW_FROM | KW_TO
;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are allowed to use From and To in CreateTableUsing command's options (actually seems we can use any string as the option key). But we can't simply add them into nonReserved because by doing that we mess other existing rules. So we create a looseIdentifier and looseNonReserved here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add this to the option rule directly?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I don't know if we will add other reserved words later. If so, the option rule might be too long. I don't count if any keywords are not included in nonReserved.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both (current approach or adding it to the option rule) are okay for me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add your initial line commentaar as a comment in the code?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding. I've added it.

//The new version of nonReserved + sql11ReservedKeywordsUsedAsIdentifier = old version of nonReserved
//Non reserved keywords are basically the keywords that can be used as identifiers.
//All the KW_* are automatically not only keywords, but also reserved keywords.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ KW_ISOLATION: 'ISOLATION';
KW_LEVEL: 'LEVEL';
KW_SNAPSHOT: 'SNAPSHOT';
KW_AUTOCOMMIT: 'AUTOCOMMIT';
KW_REFRESH: 'REFRESH';
KW_OPTIONS: 'OPTIONS';
KW_WEEK: 'WEEK'|'WEEKS';
KW_MILLISECOND: 'MILLISECOND'|'MILLISECONDS';
KW_MICROSECOND: 'MICROSECOND'|'MICROSECONDS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ TOK_UNIONTYPE;
TOK_COLTYPELIST;
TOK_CREATEDATABASE;
TOK_CREATETABLE;
TOK_CREATETABLEUSING;
TOK_TRUNCATETABLE;
TOK_CREATEINDEX;
TOK_CREATEINDEX_INDEXTBLNAME;
Expand Down Expand Up @@ -371,6 +372,10 @@ TOK_TXN_READ_WRITE;
TOK_COMMIT;
TOK_ROLLBACK;
TOK_SET_AUTOCOMMIT;
TOK_REFRESHTABLE;
TOK_TABLEPROVIDER;
TOK_TABLEOPTIONS;
TOK_TABLEOPTION;
}


Expand Down Expand Up @@ -764,6 +769,7 @@ ddlStatement
| truncateTableStatement
| alterStatement
| descStatement
| refreshStatement
| showStatement
| metastoreCheck
| createViewStatement
Expand Down Expand Up @@ -890,12 +896,31 @@ createTableStatement
@init { pushMsg("create table statement", state); }
@after { popMsg(state); }
: KW_CREATE (temp=KW_TEMPORARY)? (ext=KW_EXTERNAL)? KW_TABLE ifNotExists? name=tableName
( like=KW_LIKE likeName=tableName
(
like=KW_LIKE likeName=tableName
tableRowFormat?
tableFileFormat?
tableLocation?
tablePropertiesPrefixed?
-> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
^(TOK_LIKETABLE $likeName?)
tableRowFormat?
tableFileFormat?
tableLocation?
tablePropertiesPrefixed?
)
|
tableProvider
tableOpts?
(KW_AS selectStatementWithCTE)?
-> ^(TOK_CREATETABLEUSING $name $temp? ifNotExists?
tableProvider
tableOpts?
selectStatementWithCTE?
)
| (LPAREN columnNameTypeList RPAREN)?
(p=tableProvider?)
tableOpts?
tableComment?
tablePartition?
tableBuckets?
Expand All @@ -905,8 +930,15 @@ createTableStatement
tableLocation?
tablePropertiesPrefixed?
(KW_AS selectStatementWithCTE)?
)
-> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
-> {p != null}?
^(TOK_CREATETABLEUSING $name $temp? ifNotExists?
columnNameTypeList?
$p
tableOpts?
selectStatementWithCTE?
)
->
^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
^(TOK_LIKETABLE $likeName?)
columnNameTypeList?
tableComment?
Expand All @@ -918,7 +950,8 @@ createTableStatement
tableLocation?
tablePropertiesPrefixed?
selectStatementWithCTE?
)
)
)
;

truncateTableStatement
Expand Down Expand Up @@ -1362,6 +1395,13 @@ tabPartColTypeExpr
: tableName partitionSpec? extColumnName? -> ^(TOK_TABTYPE tableName partitionSpec? extColumnName?)
;

refreshStatement
@init { pushMsg("refresh statement", state); }
@after { popMsg(state); }
:
KW_REFRESH KW_TABLE tableName -> ^(TOK_REFRESHTABLE tableName)
;

descStatement
@init { pushMsg("describe statement", state); }
@after { popMsg(state); }
Expand Down Expand Up @@ -1757,6 +1797,30 @@ showStmtIdentifier
| StringLiteral
;

tableProvider
@init { pushMsg("table's provider", state); }
@after { popMsg(state); }
:
KW_USING Identifier (DOT Identifier)*
-> ^(TOK_TABLEPROVIDER Identifier+)
;

optionKeyValue
@init { pushMsg("table's option specification", state); }
@after { popMsg(state); }
:
looseIdentifier StringLiteral
-> ^(TOK_TABLEOPTION looseIdentifier StringLiteral)
;

tableOpts
@init { pushMsg("table's options", state); }
@after { popMsg(state); }
:
KW_OPTIONS LPAREN optionKeyValue (COMMA optionKeyValue)* RPAREN
-> ^(TOK_TABLEOPTIONS optionKeyValue+)
;

tableComment
@init { pushMsg("table's comment", state); }
@after { popMsg(state); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ class SQLContext private[sql](
@transient
protected[sql] val sqlParser: ParserInterface = new SparkSQLParser(new SparkQl(conf))

@transient
protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser)

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
protected[sql] def parseSql(sql: String): LogicalPlan = sqlParser.parsePlan(sql)

protected[sql] def executeSql(sql: String):
org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
Expand Down
106 changes: 99 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.parser.{ASTNode, ParserConf, SimpleParserConf}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType

private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends CatalystQl(conf) {
/** Check if a command should not be explained. */
Expand All @@ -42,6 +45,91 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(nodeToPlan(query), extended = extended.isDefined)

case Token("TOK_REFRESHTABLE", nameParts :: Nil) =>
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)

case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val Seq(
temp,
allowExisting,
Some(tabName),
tableCols,
Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
tableOpts,
tableAs) = getClauses(Seq(
"TEMPORARY",
"TOK_IFNOTEXISTS",
"TOK_TABNAME", "TOK_TABCOLLIST",
"TOK_TABLEPROVIDER",
"TOK_TABLEOPTIONS",
"TOK_QUERY"), createTableArgs)

val tableIdent: TableIdentifier = tabName match {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use extractTableIdent?

case Token("TOK_TABNAME", Token(dbName, _) :: Token(tableName, _) :: Nil) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am personally not big on using wildcards in pattern matches. This prevents us from catching a grammar problem early. Since most of the wildcards are actually empty lists, why not match these?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get understood. I think I don't use wildcards in pattern matches here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as you use an underscore in a pattern match, for example: Token(dbName, _), you are using a wildcard in the second position.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, just not sure where you are pointing to. I was wondering I don't have case _ branch. Thanks.

new TableIdentifier(cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
case Token("TOK_TABNAME", Token(tableName, _) :: Nil) =>
TableIdentifier(cleanIdentifier(tableName))
}

val columns = tableCols.map {
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
}

val provider = providerNameParts.map {
case Token(name, _) => name
}.mkString(".")

val options = tableOpts.map { opts =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You could also turn the option into a sequence, flatMap over it, and create the Map as a result:

val options = tableOpts.toSeq.flatMap {
  case Token("TOK_TABLEOPTIONS", options) =>
    options.map {
      case Token("TOK_TABLEOPTION", keysAndValue) =>
        val key = keysAndValue.init.map.(_.text).mkString(".")
        val value = unquoteString(keysAndValue.last.text)
        key -> value
    }
}.toMap

(note: code is not tested)

opts match {
case Token("TOK_TABLEOPTIONS", options) =>
options.map {
case Token("TOK_TABLEOPTION", Token(key, _) :: Token(value, _) :: Nil) =>
(key, unquoteString(value))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unquoting twice?

}.asInstanceOf[Seq[(String, String)]].toMap

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this cast needed?

}
}.getOrElse(Map.empty[String, String])

val asClause = tableAs.map(nodeToPlan(_))

if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need a DDL exception? Why not throw an analysis exception?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually here the thrown exception will be caught in CatalystQl which will then throw an AnalysisException. But I agreed that we can remove DDLException. Only DDLParser uses it.

"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}

if (asClause.isDefined) {
if (columns.isDefined) {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}

val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}

CreateTableUsingAsSelect(tableIdent,
provider,
temp.isDefined,
Array.empty[String],
bucketSpec = None,
mode,
options,
asClause.get)
} else {
CreateTableUsing(
tableIdent,
columns,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val Some(tableType) :: formatted :: extended :: pretty :: Nil =
Expand All @@ -52,26 +140,30 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
nodeToDescribeFallback(node)
} else {
tableType match {
case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts :: Nil) :: Nil) =>
case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts) :: Nil) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? You didn't touch the describe stuff in SparkSqlParser.g right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think it is incorrect from beginning but not be tested it out because we don't reach here before. I've tested it locally. Once all three commands are migrated, we can see this passing tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we parse the following SQL using the parse driver org.apache.spark.sql.catalyst.parser.ParseDriver.parsePlan("DESCRIBE EXTENDED tbl.a", null)

We would end up with the following AST:

TOK_DESCTABLE 1, 0, 6, 18
:- TOK_TABTYPE 1, 4, 6, 18 
:  +- TOK_TABNAME 1, 4, 6, 18 
:     :- tbl 1, 4, 4, 18 
:     +- a 1, 6, 6, 22 
+- EXTENDED 1, 2, 2, 9 

This change would pick this up, and old code didn't (I am sure I tested this though :S ). You can disable this in the DDL parser, to see if it works now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test for this? The Hive test suite apparently misses this one. I could also address in a different PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we have test for describe table command in HiveQuerySuite. Do we need another test?

nameParts match {
case Token(".", dbName :: tableName :: Nil) =>
case Token(dbName, _) :: Token(tableName, _) :: Nil =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this
// issue.
val tableIdent = extractTableIdent(nameParts)
val tableIdent = TableIdentifier(
cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
datasources.DescribeCommand(
UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
case Token(dbName, _) :: Token(tableName, _) :: Token(colName, _) :: Nil =>
// It is describing a column with the format like "describe db.table column".
nodeToDescribeFallback(node)
case tableName =>
case tableName :: Nil =>
// It is describing a table with the format like "describe table".
datasources.DescribeCommand(
UnresolvedRelation(TableIdentifier(tableName.text), None),
UnresolvedRelation(TableIdentifier(cleanIdentifier(tableName.text)), None),
isExtended = extended.isDefined)
case _ =>
nodeToDescribeFallback(node)
}
// All other cases.
case _ => nodeToDescribeFallback(node)
case _ =>
nodeToDescribeFallback(node)
}
}

Expand Down
Loading