From 74f55a6f3816d9e5a66e5fc9c3b413520b5eab40 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 18:13:32 -0700 Subject: [PATCH 01/13] [SPARK-14782][SQL] Remove HiveConf dependency from HiveSqlAstBuilder --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 11 +++-- .../spark/sql/hive/HiveSessionState.scala | 7 +-- .../sql/hive/execution/HiveSqlParser.scala | 47 ++++++++----------- 4 files changed, 27 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 27705520505a..6e7c1bc1333a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -289,7 +289,7 @@ object SQLConf { val DEFAULT_DATA_SOURCE_NAME = SQLConfigBuilder("spark.sql.sources.default") .doc("The default data source to use in input/output.") .stringConf - .createWithDefault("org.apache.spark.sql.parquet") + .createWithDefault("parquet") // This is used to control the when we will split a schema's JSON string to multiple pieces // in order to fit the JSON string in metastore's table property (by default, the value has diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 33a926e4d255..c20b022e84e9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -24,7 +24,6 @@ import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} @@ -46,6 +45,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => Parq import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ private[hive] case class HiveSerDe( @@ -59,10 +59,10 @@ private[hive] object HiveSerDe { * * @param source Currently the source abbreviation can be one of the following: * SequenceFile, RCFile, ORC, PARQUET, and case insensitive. - * @param hiveConf Hive Conf + * @param conf SQLConf * @return HiveSerDe associated with the specified source */ - def sourceToSerDe(source: String, hiveConf: HiveConf): Option[HiveSerDe] = { + def sourceToSerDe(source: String, conf: SQLConf): Option[HiveSerDe] = { val serdeMap = Map( "sequencefile" -> HiveSerDe( @@ -73,7 +73,8 @@ private[hive] object HiveSerDe { HiveSerDe( inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), - serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))), + serde = Option(conf.getConfString("hive.default.rcfile.serde", + "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))), "orc" -> HiveSerDe( @@ -297,7 +298,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { CatalogTableType.MANAGED_TABLE } - val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hiveconf) + val maybeSerDe = HiveSerDe.sourceToSerDe(provider, conf) val dataSource = DataSource( hive, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 09297c27dc5b..2c360cb7ca8f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -53,11 +53,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) */ lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession() - /** - * A Hive helper class for substituting variables in a SQL statement. - */ - lazy val substitutor = new VariableSubstitution - override lazy val conf: SQLConf = new SQLConf { override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) } @@ -114,7 +109,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) /** * Parser for HiveQl query texts. */ - override lazy val sqlParser: ParserInterface = new HiveSqlParser(substitutor, hiveconf) + override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf, hiveconf) /** * Planner that takes into account Hive-specific strategies. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 4ff02cdbd0b3..8f270f762852 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ +import scala.util.Try import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.parse.{EximUtil, VariableSubstitution} +import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ @@ -32,18 +31,16 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike} import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView, HiveSerDe} -import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveMetastoreTypes, HiveSerDe} -import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.internal.SQLConf /** * Concrete parser for HiveQl statements. */ -class HiveSqlParser( - substitutor: VariableSubstitution, - hiveconf: HiveConf) - extends AbstractSqlParser { +class HiveSqlParser(conf: SQLConf, hiveconf: HiveConf) extends AbstractSqlParser { - val astBuilder = new HiveSqlAstBuilder(hiveconf) + val astBuilder = new HiveSqlAstBuilder(conf) + + lazy val substitutor = new VariableSubstitution protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { super.parse(substitutor.substitute(hiveconf, command))(toResult) @@ -57,7 +54,7 @@ class HiveSqlParser( /** * Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier. */ -class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { +class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder { import ParserUtils._ /** @@ -184,8 +181,8 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { // Storage format val defaultStorage: CatalogStorageFormat = { - val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) - val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf) + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) CatalogStorageFormat( locationUri = None, inputFormat = defaultHiveSerde.flatMap(_.inputFormat) @@ -323,7 +320,7 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { // Decode and input/output format. type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format(fmt: RowFormatContext, confVar: ConfVars): Format = fmt match { + def format(fmt: RowFormatContext, configKey: String): Format = fmt match { case c: RowFormatDelimitedContext => // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema // expects a seq of pairs in which the old parsers' token names are used as keys. @@ -345,8 +342,8 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == classOf[LazySimpleSerDe].getCanonicalName) { - Option(hiveConf.getVar(confVar)) + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Try(conf.getConfString(configKey)).toOption } else { None } @@ -354,17 +351,18 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { case null => // Use default (serde) format. - val name = hiveConf.getVar(ConfVars.HIVESCRIPTSERDE) + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq(serdeConstants.FIELD_DELIM -> "\t") - val recordHandler = Option(hiveConf.getVar(confVar)) + val recordHandler = Try(conf.getConfString(configKey)).toOption (Nil, Option(name), props, recordHandler) } val (inFormat, inSerdeClass, inSerdeProps, reader) = - format(inRowFormat, ConfVars.HIVESCRIPTRECORDREADER) + format(inRowFormat, "hive.script.serde") val (outFormat, outSerdeClass, outSerdeProps, writer) = - format(inRowFormat, ConfVars.HIVESCRIPTRECORDWRITER) + format(outRowFormat, "hive.script.recordwriter") HiveScriptIOSchema( inFormat, outFormat, @@ -374,13 +372,6 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { schemaLess) } - /** - * Create location string. - */ - override def visitLocationSpec(ctx: LocationSpecContext): String = { - EximUtil.relativeToAbsolutePath(hiveConf, super.visitLocationSpec(ctx)) - } - /** Empty storage format for default values and copies. */ private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) @@ -402,7 +393,7 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { override def visitGenericFileFormat( ctx: GenericFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { val source = ctx.identifier.getText - HiveSerDe.sourceToSerDe(source, hiveConf) match { + HiveSerDe.sourceToSerDe(source, conf) match { case Some(s) => EmptyStorageFormat.copy( inputFormat = s.inputFormat, From b761a179fd48370d210f3051307339ab3f5f0ba2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 19:23:14 -0700 Subject: [PATCH 02/13] a checkpoint --- .../org/apache/spark/sql/SQLContext.scala | 16 ----- .../spark/sql/execution/SparkSqlParser.scala | 72 ++++++++++++++++++- .../sql/execution/command/resources.scala | 48 +++++++++++++ .../spark/sql/execution/command/views.scala | 32 +++++++++ .../spark/sql/internal/SessionState.scala | 23 ++++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 ++--- .../spark/sql/hive/HiveSessionState.scala | 2 +- .../sql/hive/execution/HiveSqlParser.scala | 71 +----------------- .../spark/sql/hive/execution/commands.scala | 26 ------- .../spark/sql/hive/HiveDDLCommandSuite.scala | 4 +- 10 files changed, 180 insertions(+), 130 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f3f84144ad93..d85ddd5a98a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -207,22 +207,6 @@ class SQLContext private[sql]( sessionState.addJar(path) } - /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */ - @transient protected[sql] lazy val functionResourceLoader: FunctionResourceLoader = { - new FunctionResourceLoader { - override def loadResource(resource: FunctionResource): Unit = { - resource.resourceType match { - case JarResource => addJar(resource.uri) - case FileResource => sparkContext.addFile(resource.uri) - case ArchiveResource => - throw new AnalysisException( - "Archive is not allowed to be loaded. If YARN mode is used, " + - "please use --archives options while calling spark-submit.") - } - } - } - } - /** * :: Experimental :: * A collection of methods that are considered experimental, but can be used to hook into diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8ed6ed21d017..944a08b0ab18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ - import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} @@ -789,4 +789,74 @@ class SparkSqlAstBuilder extends AstBuilder { override def visitConstantList(ctx: ConstantListContext): Seq[String] = withOrigin(ctx) { ctx.constant.asScala.map(visitStringConstant) } + + /** + * Fail an unsupported Hive native command. + */ + override def visitFailNativeCommand( + ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) { + val keywords = if (ctx.kws != null) { + Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ") + } else { + // SET ROLE is the exception to the rule, because we handle this before other SET commands. + "SET ROLE" + } + throw new ParseException(s"Unsupported operation: $keywords", ctx) + } + + /** + * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource. + */ + override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) { + ctx.identifier.getText.toLowerCase match { + case "file" => AddFile(remainder(ctx.identifier).trim) + case "jar" => AddJar(remainder(ctx.identifier).trim) + case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx) + } + } + + /** + * Create a [[CreateTableLike]] command. + */ + override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { + val targetTable = visitTableIdentifier(ctx.target) + val sourceTable = visitTableIdentifier(ctx.source) + CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null) + } + + /** + * Create a [[CatalogStorageFormat]] for creating tables. + */ + override def visitCreateFileFormat( + ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + (ctx.fileFormat, ctx.storageHandler) match { + // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format + case (c: TableFileFormatContext, null) => + visitTableFileFormat(c) + // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO + case (c: GenericFileFormatContext, null) => + visitGenericFileFormat(c) + case (null, storageHandler) => + throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx) + case _ => + throw new ParseException("expected either STORED AS or STORED BY, not both", ctx) + } + } + + /** Empty storage format for default values and copies. */ + private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) + + /** + * Create a [[CatalogStorageFormat]]. + */ + override def visitTableFileFormat( + ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { + EmptyStorageFormat.copy( + inputFormat = Option(string(ctx.inFmt)), + outputFormat = Option(string(ctx.outFmt)), + serde = Option(ctx.serdeCls).map(string) + ) + } + + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala new file mode 100644 index 000000000000..e7191e4bfe3b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** + * Adds a jar to the current session so it can be used (for UDFs or serdes). + */ +case class AddJar(path: String) extends RunnableCommand { + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("result", IntegerType, nullable = false) :: Nil) + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.addJar(path) + Seq(Row(0)) + } +} + +/** + * Adds a file to the current session so it can be used. + */ +case class AddFile(path: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sparkContext.addFile(path) + Seq.empty[Row] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala new file mode 100644 index 000000000000..aa6112c7f043 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} + +case class CreateViewAsSelectLogicalCommand( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + sql: String) extends UnaryNode with Command { + override def output: Seq[Attribute] = Seq.empty[Attribute] + override lazy val resolved: Boolean = false +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 42915d5887f4..ccfc04eb3218 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.internal import java.util.Properties import scala.collection.JavaConverters._ - import org.apache.spark.internal.config.ConfigEntry -import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -56,13 +55,29 @@ private[sql] class SessionState(ctx: SQLContext) { */ lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy() + /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */ + lazy val functionResourceLoader: FunctionResourceLoader = { + new FunctionResourceLoader { + override def loadResource(resource: FunctionResource): Unit = { + resource.resourceType match { + case JarResource => addJar(resource.uri) + case FileResource => ctx.sparkContext.addFile(resource.uri) + case ArchiveResource => + throw new AnalysisException( + "Archive is not allowed to be loaded. If YARN mode is used, " + + "please use --archives options while calling spark-submit.") + } + } + } + } + /** * Internal catalog for managing table and database states. */ lazy val catalog = new SessionCatalog( ctx.externalCatalog, - ctx.functionResourceLoader, + functionResourceLoader, functionRegistry, conf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 33a926e4d255..a917e6befe3d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.execution.command.CreateViewAsSelectLogicalCommand import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ @@ -698,7 +699,8 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if p.resolved => p - case CreateViewAsSelect(table, child, allowExisting, replace, sql) if conf.nativeView => + case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) + if conf.nativeView => if (allowExisting && replace) { throw new AnalysisException( "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.") @@ -712,7 +714,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { allowExisting, replace) - case CreateViewAsSelect(table, child, allowExisting, replace, sql) => + case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) => HiveNativeCommand(sql) case p @ CreateTableAsSelect(table, child, allowExisting) => @@ -1095,13 +1097,3 @@ private[hive] case class CreateTableAsSelect( tableDesc.storage.outputFormat.isDefined && childrenResolved } - -private[hive] case class CreateViewAsSelect( - tableDesc: CatalogTable, - child: LogicalPlan, - allowExisting: Boolean, - replace: Boolean, - sql: String) extends UnaryNode with Command { - override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = false -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 09297c27dc5b..e4c32d795af7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -87,7 +87,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) sharedState.externalCatalog, metadataHive, ctx, - ctx.functionResourceLoader, + ctx.sessionState.functionResourceLoader, functionRegistry, conf, hiveconf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 4ff02cdbd0b3..03d938d804d9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -30,8 +30,8 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike} -import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, CreateViewAsSelect => CreateView, HiveSerDe} +import org.apache.spark.sql.execution.command.{CreateTable, CreateViewAsSelectLogicalCommand} +import org.apache.spark.sql.hive.{CreateTableAsSelect => CTAS, HiveSerDe} import org.apache.spark.sql.hive.{HiveGenericUDTF, HiveMetastoreTypes, HiveSerDe} import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper @@ -68,31 +68,6 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { HiveNativeCommand(command(ctx)) } - /** - * Fail an unsupported Hive native command. - */ - override def visitFailNativeCommand( - ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) { - val keywords = if (ctx.kws != null) { - Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ") - } else { - // SET ROLE is the exception to the rule, because we handle this before other SET commands. - "SET ROLE" - } - throw new ParseException(s"Unsupported operation: $keywords", ctx) - } - - /** - * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource. - */ - override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) { - ctx.identifier.getText.toLowerCase match { - case "file" => AddFile(remainder(ctx.identifier).trim) - case "jar" => AddJar(remainder(ctx.identifier).trim) - case other => throw new ParseException(s"Unsupported resource type '$other'.", ctx) - } - } - /** * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other * options are passed on to Hive) e.g.: @@ -110,25 +85,6 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { } } - /** - * Create a [[CatalogStorageFormat]] for creating tables. - */ - override def visitCreateFileFormat( - ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - (ctx.fileFormat, ctx.storageHandler) match { - // Expected format: INPUTFORMAT input_format OUTPUTFORMAT output_format - case (c: TableFileFormatContext, null) => - visitTableFileFormat(c) - // Expected format: SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO - case (c: GenericFileFormatContext, null) => - visitGenericFileFormat(c) - case (null, storageHandler) => - throw new ParseException("Operation not allowed: ... STORED BY storage_handler ...", ctx) - case _ => - throw new ParseException("expected either STORED AS or STORED BY, not both", ctx) - } - } - /** * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelect]]. * @@ -224,15 +180,6 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { } } - /** - * Create a [[CreateTableLike]] command. - */ - override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { - val targetTable = visitTableIdentifier(ctx.target) - val sourceTable = visitTableIdentifier(ctx.source) - CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null) - } - /** * Create or replace a view. This creates a [[CreateViewAsSelect]] command. * @@ -303,7 +250,7 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { viewOriginalText = sql, viewText = sql, comment = comment) - CreateView(tableDesc, plan(query), allowExist, replace, command(ctx)) + CreateViewAsSelectLogicalCommand(tableDesc, plan(query), allowExist, replace, command(ctx)) } /** @@ -384,18 +331,6 @@ class HiveSqlAstBuilder(hiveConf: HiveConf) extends SparkSqlAstBuilder { /** Empty storage format for default values and copies. */ private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) - /** - * Create a [[CatalogStorageFormat]]. - */ - override def visitTableFileFormat( - ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - EmptyStorageFormat.copy( - inputFormat = Option(string(ctx.inFmt)), - outputFormat = Option(string(ctx.outFmt)), - serde = Option(ctx.serdeCls).map(string) - ) - } - /** * Resolve a [[HiveSerDe]] based on the name given and return it as a [[CatalogStorageFormat]]. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index b5ee9a62954c..78f8bfe59f79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -123,32 +123,6 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand { } } -private[hive] -case class AddJar(path: String) extends RunnableCommand { - - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("result", IntegerType, false) :: Nil) - schema.toAttributes - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.addJar(path) - - Seq(Row(0)) - } -} - -private[hive] -case class AddFile(path: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.runNativeSql(s"ADD FILE $path") - sqlContext.sparkContext.addFile(path) - Seq.empty[Row] - } -} - private[hive] case class CreateMetastoreDataSource( tableIdent: TableIdentifier, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 484cf528e6db..f7c47300313d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike} +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike, CreateViewAsSelectLogicalCommand} import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.test.TestHive @@ -40,7 +40,7 @@ class HiveDDLCommandSuite extends PlanTest { parser.parsePlan(sql).collect { case CreateTable(desc, allowExisting) => (desc, allowExisting) case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting) - case CreateViewAsSelect(desc, _, allowExisting, _, _) => (desc, allowExisting) + case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting) }.head } From 0b58e18a46066fee2c13fa520f590769b35d51a0 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 19:30:07 -0700 Subject: [PATCH 03/13] Fix tests --- .../org/apache/spark/sql/hive/execution/HiveSqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 8f270f762852..90f10d5ebdc6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -359,7 +359,7 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder { } val (inFormat, inSerdeClass, inSerdeProps, reader) = - format(inRowFormat, "hive.script.serde") + format(inRowFormat, "hive.script.recordreader") val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat, "hive.script.recordwriter") From 25da47dfb040e0936eda8bfd5b282e1d3e094b5a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 20:44:36 -0700 Subject: [PATCH 04/13] fix style --- .../main/scala/org/apache/spark/sql/internal/SessionState.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 1819f01772c8..08a99627bf44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.util.Properties import scala.collection.JavaConverters._ + import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} From c8708f7e9395811c9796bcbba68f63243bdda6cc Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 22:54:40 -0700 Subject: [PATCH 05/13] fix test --- .../spark/sql/execution/SparkSqlParser.scala | 97 ++++++++++++++++++- .../spark/sql/execution/command/tables.scala | 19 ++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 21 +--- .../spark/sql/hive/HiveDDLCommandSuite.scala | 4 +- 4 files changed, 120 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 9ce8274c849b..ac12a72fc670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageForma import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} -import org.apache.spark.sql.execution.command.{CreateViewAsSelectLogicalCommand, DescribeCommand => _, _} +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -820,6 +820,101 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } + /** + * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelectLogicalPlan]]. + * + * This is not used to create datasource tables, which is handled through + * "CREATE TABLE ... USING ...". + * + * Note: several features are currently not supported - temporary tables, bucketing, + * skewed columns and storage handlers (STORED BY). + * + * Expected format: + * {{{ + * CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name + * [(col1 data_type [COMMENT col_comment], ...)] + * [COMMENT table_comment] + * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] + * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] + * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) [STORED AS DIRECTORIES]] + * [ROW FORMAT row_format] + * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] + * [LOCATION path] + * [TBLPROPERTIES (property_name=property_value, ...)] + * [AS select_statement]; + * }}} + */ + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) { + val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) + // TODO: implement temporary tables + if (temp) { + throw new ParseException( + "CREATE TEMPORARY TABLE is not supported yet. " + + "Please use registerTempTable as an alternative.", ctx) + } + if (ctx.skewSpec != null) { + throw new ParseException("Operation not allowed: CREATE TABLE ... SKEWED BY ...", ctx) + } + if (ctx.bucketSpec != null) { + throw new ParseException("Operation not allowed: CREATE TABLE ... CLUSTERED BY ...", ctx) + } + val tableType = if (external) { + CatalogTableType.EXTERNAL_TABLE + } else { + CatalogTableType.MANAGED_TABLE + } + val comment = Option(ctx.STRING).map(string) + val partitionCols = Option(ctx.partitionColumns).toSeq.flatMap(visitCatalogColumns) + val cols = Option(ctx.columns).toSeq.flatMap(visitCatalogColumns) + val properties = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty) + val selectQuery = Option(ctx.query).map(plan) + + // Note: Hive requires partition columns to be distinct from the schema, so we need + // to include the partition columns here explicitly + val schema = cols ++ partitionCols + + // Storage format + val defaultStorage: CatalogStorageFormat = { + val defaultStorageType = conf.getConfString("hive.default.fileformat", "textfile") + val defaultHiveSerde = HiveSerDe.sourceToSerDe(defaultStorageType, conf) + CatalogStorageFormat( + locationUri = None, + inputFormat = defaultHiveSerde.flatMap(_.inputFormat) + .orElse(Some("org.apache.hadoop.mapred.TextInputFormat")), + outputFormat = defaultHiveSerde.flatMap(_.outputFormat) + .orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), + // Note: Keep this unspecified because we use the presence of the serde to decide + // whether to convert a table created by CTAS to a datasource table. + serde = None, + serdeProperties = Map()) + } + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(EmptyStorageFormat) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + val location = Option(ctx.locationSpec).map(visitLocationSpec) + val storage = CatalogStorageFormat( + locationUri = location, + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) + + // TODO support the sql text - have a proper location for this! + val tableDesc = CatalogTable( + identifier = name, + tableType = tableType, + storage = storage, + schema = schema, + partitionColumnNames = partitionCols.map(_.name), + properties = properties, + comment = comment) + + selectQuery match { + case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists) + case None => CreateTable(tableDesc, ifNotExists) + } + } + /** * Create a [[CreateTableLike]] command. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 0b419851746a..9a7c11ac331d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -20,6 +20,25 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} + + +case class CreateTableAsSelectLogicalPlan( + tableDesc: CatalogTable, + child: LogicalPlan, + allowExisting: Boolean) extends UnaryNode with Command { + + override def output: Seq[Attribute] = Seq.empty[Attribute] + + override lazy val resolved: Boolean = + tableDesc.identifier.database.isDefined && + tableDesc.schema.nonEmpty && + tableDesc.storage.serde.isDefined && + tableDesc.storage.inputFormat.isDefined && + tableDesc.storage.outputFormat.isDefined && + childrenResolved +} /** * A command to create a table with the same definition of the given existing table. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8e7862c5e517..3eea6c06ac7b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -40,13 +40,13 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.execution.command.CreateViewAsSelectLogicalCommand +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ @@ -657,7 +657,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { case CreateViewAsSelectLogicalCommand(table, child, allowExisting, replace, sql) => HiveNativeCommand(sql) - case p @ CreateTableAsSelect(table, child, allowExisting) => + case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) => val schema = if (table.schema.nonEmpty) { table.schema } else { @@ -1022,18 +1022,3 @@ private[hive] object HiveMetastoreTypes { case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) } } - -private[hive] case class CreateTableAsSelect( - tableDesc: CatalogTable, - child: LogicalPlan, - allowExisting: Boolean) extends UnaryNode with Command { - - override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = - tableDesc.identifier.database.isDefined && - tableDesc.schema.nonEmpty && - tableDesc.storage.serde.isDefined && - tableDesc.storage.inputFormat.isDefined && - tableDesc.storage.outputFormat.isDefined && - childrenResolved -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index f7c47300313d..5a306210b39f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike, CreateViewAsSelectLogicalCommand} +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike, CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.test.TestHive @@ -39,7 +39,7 @@ class HiveDDLCommandSuite extends PlanTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parser.parsePlan(sql).collect { case CreateTable(desc, allowExisting) => (desc, allowExisting) - case CreateTableAsSelect(desc, _, allowExisting) => (desc, allowExisting) + case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting) case CreateViewAsSelectLogicalCommand(desc, _, allowExisting, _, _) => (desc, allowExisting) }.head } From b4734af0297b18a4379a1389da450dcc78ce5668 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 23:08:27 -0700 Subject: [PATCH 06/13] style --- .../scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 5a306210b39f..4c90dbeb1bac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableLike, CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand} import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.test.TestHive From c878aef1ea7bb61e29961253a2e3510954b57348 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 23:15:44 -0700 Subject: [PATCH 07/13] [SPARK-14795][SQL] Remove the use of Hive's variable substitution --- .../org/apache/spark/sql/hive/HiveSessionState.scala | 3 +-- .../spark/sql/hive/execution/HiveSqlParser.scala | 12 +++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 171def43b570..6f4332c65f93 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -21,7 +21,6 @@ import java.util.regex.Pattern import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer @@ -109,7 +108,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) /** * Parser for HiveQl query texts. */ - override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf, hiveconf) + override lazy val sqlParser: ParserInterface = new HiveSqlParser(conf) /** * Planner that takes into account Hive-specific strategies. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 00f829d85029..121890ccddb4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.execution import scala.util.Try import org.antlr.v4.runtime.Token -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.sql.catalyst.catalog._ @@ -29,23 +27,23 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} /** * Concrete parser for HiveQl statements. */ -class HiveSqlParser(conf: SQLConf, hiveconf: HiveConf) extends AbstractSqlParser { +class HiveSqlParser(conf: SQLConf) extends AbstractSqlParser { val astBuilder = new HiveSqlAstBuilder(conf) - lazy val substitutor = new VariableSubstitution + private val substitutor = new VariableSubstitution(conf) protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { - super.parse(substitutor.substitute(hiveconf, command))(toResult) + super.parse(substitutor.substitute(command))(toResult) } protected override def nativeCommand(sqlText: String): LogicalPlan = { - HiveNativeCommand(substitutor.substitute(hiveconf, sqlText)) + HiveNativeCommand(substitutor.substitute(sqlText)) } } From d8137838dde92a6ad00e17cbd5aa2745881583d0 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 23:48:23 -0700 Subject: [PATCH 08/13] [SPARK-14798][SQL] Move native command and script transformation parsing into SparkSqlAstBuilder --- .../plans/logical/ScriptTransformation.scala | 64 ++++++++++++- .../spark/sql/execution/SparkSqlParser.scala | 95 ++++++++++++++++++- .../command}/HiveNativeCommand.scala | 11 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +- .../spark/sql/hive/HiveQueryExecution.scala | 4 +- .../spark/sql/hive/HiveStrategies.scala | 5 +- .../apache/spark/sql/hive/SQLBuilder.scala | 10 +- .../sql/hive/execution/HiveSqlParser.scala | 84 +--------------- .../hive/execution/ScriptTransformation.scala | 67 ++++--------- .../apache/spark/sql/hive/test/TestHive.scala | 3 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 3 +- .../hive/execution/HiveComparisonTest.scala | 2 +- 12 files changed, 190 insertions(+), 161 deletions(-) rename sql/{hive/src/main/scala/org/apache/spark/sql/hive/execution => core/src/main/scala/org/apache/spark/sql/execution/command}/HiveNativeCommand.scala (82%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala index 578027da776e..0be12fb96a28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala @@ -37,7 +37,65 @@ case class ScriptTransformation( } /** - * A placeholder for implementation specific input and output properties when passing data - * to a script. For example, in Hive this would specify which SerDes to use. + * Input and output properties when passing data to a script. + * For example, in Hive this would specify which SerDes to use. */ -trait ScriptInputOutputSchema +case class ScriptInputOutputSchema( + inputRowFormat: Seq[(String, String)], + outputRowFormat: Seq[(String, String)], + inputSerdeClass: Option[String], + outputSerdeClass: Option[String], + inputSerdeProps: Seq[(String, String)], + outputSerdeProps: Seq[(String, String)], + recordReaderClass: Option[String], + recordWriterClass: Option[String], + schemaLess: Boolean) { + + def inputRowFormatSQL: Option[String] = + getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) + + def outputRowFormatSQL: Option[String] = + getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) + + /** + * Get the row format specification + * Note: + * 1. Changes are needed when readerClause and writerClause are supported. + * 2. Changes are needed when "ESCAPED BY" is supported. + */ + private def getRowFormatSQL( + rowFormat: Seq[(String, String)], + serdeClass: Option[String], + serdeProps: Seq[(String, String)]): Option[String] = { + if (schemaLess) return Some("") + + val rowFormatDelimited = + rowFormat.map { + case ("TOK_TABLEROWFORMATFIELD", value) => + "FIELDS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATCOLLITEMS", value) => + "COLLECTION ITEMS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATMAPKEYS", value) => + "MAP KEYS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATLINES", value) => + "LINES TERMINATED BY " + value + case ("TOK_TABLEROWFORMATNULL", value) => + "NULL DEFINED AS " + value + case o => return None + } + + val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") + val serdePropsSQL = + if (serdeClass.nonEmpty) { + val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") + if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" + } else { + "" + } + if (rowFormat.nonEmpty) { + Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) + } else { + Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) + } + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ac12a72fc670..05fb1ef63161 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ +import scala.util.Try import org.antlr.v4.runtime.{ParserRuleContext, Token} @@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} + /** * Concrete parser for Spark SQL statements. */ -class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{ +class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { val astBuilder = new SparkSqlAstBuilder(conf) + + private val substitutor = new VariableSubstitution(conf) + + protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = { + super.parse(substitutor.substitute(command))(toResult) + } + + protected override def nativeCommand(sqlText: String): LogicalPlan = { + HiveNativeCommand(substitutor.substitute(sqlText)) + } } /** @@ -44,6 +56,14 @@ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { import org.apache.spark.sql.catalyst.parser.ParserUtils._ + /** + * Pass a command to Hive using a [[HiveNativeCommand]]. + */ + override def visitExecuteNativeCommand( + ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { + HiveNativeCommand(command(ctx)) + } + /** * Create a [[SetCommand]] logical plan. * @@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { Option(col.STRING).map(string)) } } + + /** + * Create a [[ScriptInputOutputSchema]]. + */ + override protected def withScriptIOSchema( + ctx: QuerySpecificationContext, + inRowFormat: RowFormatContext, + recordWriter: Token, + outRowFormat: RowFormatContext, + recordReader: Token, + schemaLess: Boolean): ScriptInputOutputSchema = { + if (recordWriter != null || recordReader != null) { + throw new ParseException( + "Unsupported operation: Used defined record reader/writer classes.", ctx) + } + + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + def format(fmt: RowFormatContext, configKey: String): Format = fmt match { + case c: RowFormatDelimitedContext => + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq + } + val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) + + (entries, None, Seq.empty, None) + + case c: RowFormatSerdeContext => + // Use a serde format. + val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Try(conf.getConfString(configKey)).toOption + } else { + None + } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + val props = Seq("field.delim" -> "\t") + val recordHandler = Try(conf.getConfString(configKey)).toOption + (Nil, Option(name), props, recordHandler) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format(inRowFormat, "hive.script.recordreader") + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format(outRowFormat, "hive.script.recordwriter") + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala similarity index 82% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala index 8c1f4a8dc513..39e441f1c3be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/HiveNativeCommand.scala @@ -15,22 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.execution +package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.HiveSessionState import org.apache.spark.sql.types.StringType -private[hive] +/** + * A command that we delegate to Hive. Eventually we should remove this. + */ case class HiveNativeCommand(sql: String) extends RunnableCommand { override def output: Seq[AttributeReference] = Seq(AttributeReference("result", StringType, nullable = false)()) override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_)) + sqlContext.sessionState.runNativeSql(sql).map(Row(_)) } - } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3eea6c06ac7b..2910f64d43be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -40,11 +40,10 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand} +import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand} import org.apache.spark.sql.execution.datasources.{Partition => _, _} import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala index 1c1bfb610c29..0ee34f07fd03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} -import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} +import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand} +import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index bbdcc8c6c2ff..8720e54ed665 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -38,8 +38,9 @@ private[hive] trait HiveStrategies { object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) => - ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil + case logical.ScriptTransformation(input, script, output, child, ioschema) => + val hiveIoSchema = HiveScriptIOSchema(ioschema) + ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 2d44813f0eac..86115d0e9bbb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.execution.HiveScriptIOSchema import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} /** @@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi } private def scriptTransformationToSQL(plan: ScriptTransformation): String = { - val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema] - val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse( + val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse( throw new UnsupportedOperationException( - s"unsupported row format ${ioSchema.inputRowFormat}")) - val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse( + s"unsupported row format ${plan.ioschema.inputRowFormat}")) + val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse( throw new UnsupportedOperationException( - s"unsupported row format ${ioSchema.outputRowFormat}")) + s"unsupported row format ${plan.ioschema.outputRowFormat}")) val outputSchema = plan.output.map { attr => s"${attr.sql} ${attr.dataType.simpleString}" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala index 121890ccddb4..98504ca6d13f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveSqlParser.scala @@ -17,16 +17,11 @@ package org.apache.spark.sql.hive.execution -import scala.util.Try - -import org.antlr.v4.runtime.Token -import org.apache.hadoop.hive.serde.serdeConstants - -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.HiveNativeCommand import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} /** @@ -54,14 +49,6 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { import ParserUtils._ - /** - * Pass a command to Hive using a [[HiveNativeCommand]]. - */ - override def visitExecuteNativeCommand( - ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) { - HiveNativeCommand(command(ctx)) - } - /** * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other * options are passed on to Hive) e.g.: @@ -78,73 +65,4 @@ class HiveSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { HiveNativeCommand(command(ctx)) } } - - /** - * Create a [[HiveScriptIOSchema]]. - */ - override protected def withScriptIOSchema( - ctx: QuerySpecificationContext, - inRowFormat: RowFormatContext, - recordWriter: Token, - outRowFormat: RowFormatContext, - recordReader: Token, - schemaLess: Boolean): HiveScriptIOSchema = { - if (recordWriter != null || recordReader != null) { - throw new ParseException( - "Unsupported operation: Used defined record reader/writer classes.", ctx) - } - - // Decode and input/output format. - type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format(fmt: RowFormatContext, configKey: String): Format = fmt match { - case c: RowFormatDelimitedContext => - // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema - // expects a seq of pairs in which the old parsers' token names are used as keys. - // Transforming the result of visitRowFormatDelimited would be quite a bit messier than - // retrieving the key value pairs ourselves. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq - } - val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - - (entries, None, Seq.empty, None) - - case c: RowFormatSerdeContext => - // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) - - // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Try(conf.getConfString(configKey)).toOption - } else { - None - } - (Seq.empty, Option(name), props.toSeq, recordHandler) - - case null => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq(serdeConstants.FIELD_DELIM -> "\t") - val recordHandler = Try(conf.getConfString(configKey)).toOption - (Nil, Option(name), props, recordHandler) - } - - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format(inRowFormat, "hive.script.recordreader") - - val (outFormat, outSerdeClass, outSerdeProps, writer) = - format(outRowFormat, "hive.script.recordwriter") - - HiveScriptIOSchema( - inFormat, outFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - reader, writer, - schemaLess) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 2f7cec354d84..8c8becfb8752 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -312,6 +312,22 @@ private class ScriptTransformationWriterThread( } } +private[hive] +object HiveScriptIOSchema { + def apply(input: ScriptInputOutputSchema): HiveScriptIOSchema = { + HiveScriptIOSchema( + input.inputRowFormat, + input.outputRowFormat, + input.inputSerdeClass, + input.outputSerdeClass, + input.inputSerdeProps, + input.outputSerdeProps, + input.recordReaderClass, + input.recordWriterClass, + input.schemaLess) + } +} + /** * The wrapper class of Hive input and output schema properties */ @@ -325,7 +341,8 @@ case class HiveScriptIOSchema ( outputSerdeProps: Seq[(String, String)], recordReaderClass: Option[String], recordWriterClass: Option[String], - schemaLess: Boolean) extends ScriptInputOutputSchema with HiveInspectors { + schemaLess: Boolean) + extends HiveInspectors { private val defaultFormat = Map( ("TOK_TABLEROWFORMATFIELD", "\t"), @@ -402,52 +419,4 @@ case class HiveScriptIOSchema ( instance } } - - def inputRowFormatSQL: Option[String] = - getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) - - def outputRowFormatSQL: Option[String] = - getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) - - /** - * Get the row format specification - * Note: - * 1. Changes are needed when readerClause and writerClause are supported. - * 2. Changes are needed when "ESCAPED BY" is supported. - */ - private def getRowFormatSQL( - rowFormat: Seq[(String, String)], - serdeClass: Option[String], - serdeProps: Seq[(String, String)]): Option[String] = { - if (schemaLess) return Some("") - - val rowFormatDelimited = - rowFormat.map { - case ("TOK_TABLEROWFORMATFIELD", value) => - "FIELDS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATCOLLITEMS", value) => - "COLLECTION ITEMS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATMAPKEYS", value) => - "MAP KEYS TERMINATED BY " + value - case ("TOK_TABLEROWFORMATLINES", value) => - "LINES TERMINATED BY " + value - case ("TOK_TABLEROWFORMATNULL", value) => - "NULL DEFINED AS " + value - case o => return None - } - - val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") - val serdePropsSQL = - if (serdeClass.nonEmpty) { - val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") - if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" - } else { - "" - } - if (rowFormat.nonEmpty) { - Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) - } else { - Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 2bb13996c145..741e3bdd18b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,10 +39,9 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.CacheTableCommand +import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{ShutdownHookManager, Utils} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 4c90dbeb1bac..e3522567b948 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -29,8 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation} -import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand} -import org.apache.spark.sql.hive.execution.HiveNativeCommand +import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewAsSelectLogicalCommand, HiveNativeCommand} import org.apache.spark.sql.hive.test.TestHive class HiveDDLCommandSuite extends PlanTest { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 994dc4a2d28f..77906ef2b076 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -28,7 +28,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand} +import org.apache.spark.sql.execution.command.{ExplainCommand, HiveNativeCommand, SetCommand} import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable} import org.apache.spark.sql.hive.SQLBuilder From 5e99598b3706149e3e53861bb20991a1165b33e5 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Apr 2016 23:50:15 -0700 Subject: [PATCH 09/13] style fix --- .../spark/sql/catalyst/plans/logical/ScriptTransformation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala index 0be12fb96a28..e176e9b82bf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala @@ -98,4 +98,4 @@ case class ScriptInputOutputSchema( Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) } } -} \ No newline at end of file +} From 737ea417009f318109f4288c11d899f2987322c4 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 00:23:06 -0700 Subject: [PATCH 10/13] Fix test --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6b71e59b7359..34572ad480cc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -512,7 +512,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("SELECT key FROM ${hiveconf:tbl} ORDER BY key, value limit 1"), sql("SELECT key FROM src ORDER BY key, value limit 1").collect().toSeq) - sql("set hive.variable.substitute=false") // disable the substitution + sql("set spark.sql.variable.substitute=false") // disable the substitution sql("set tbl2=src") intercept[Exception] { sql("SELECT key FROM ${hiveconf:tbl2} ORDER BY key, value limit 1").collect() From ecc9ea758a1040464cf6ffec82c4c14242f5beff Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 00:29:53 -0700 Subject: [PATCH 11/13] native --- .../test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index adc7af32ca03..0d79c1381c57 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.HiveNativeCommand import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.test.TestHiveSingleton From f74b38126a3273c0ab0fbe5c16d9d14a6d5a045a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 09:52:40 -0700 Subject: [PATCH 12/13] Fix test --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3beaadbc5907..345ee8ef28ea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -518,7 +518,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("SELECT key FROM ${hiveconf:tbl2} ORDER BY key, value limit 1").collect() } - sql("set hive.variable.substitute=true") // enable the substitution + sql("set spark.sql.variable.substitute=true") // enable the substitution checkAnswer( sql("SELECT key FROM ${hiveconf:tbl2} ORDER BY key, value limit 1"), sql("SELECT key FROM src ORDER BY key, value limit 1").collect().toSeq) From 1356aa08a4b86766913c7f4ab2c8c7ce292454dc Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 21 Apr 2016 13:50:01 -0700 Subject: [PATCH 13/13] fix test --- .../spark/sql/internal/SessionState.scala | 2 +- .../execution/command/DDLCommandSuite.scala | 20 ------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 08a99627bf44..36aec2de8dfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -165,7 +165,7 @@ private[sql] class SessionState(ctx: SQLContext) { } def runNativeSql(sql: String): Seq[String] = { - throw new UnsupportedOperationException + throw new AnalysisException("Unsupported query: " + sql) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index e99eb02252af..a1ffda965625 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) } - test("unsupported operations") { - intercept[ParseException] { - parser.parsePlan("DROP TABLE tab PURGE") - } - intercept[ParseException] { - parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')") - } - intercept[ParseException] { - parser.parsePlan( - """ - |CREATE EXTERNAL TABLE oneToTenDef - |USING org.apache.spark.sql.sources - |OPTIONS (from '1', to '10') - """.stripMargin) - } - intercept[ParseException] { - parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData") - } - } - test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") { val sql = "SELECT distribute, unset FROM x" val parsed = parser.parsePlan(sql)