Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,65 @@ case class ScriptTransformation(
}

/**
* A placeholder for implementation specific input and output properties when passing data
* to a script. For example, in Hive this would specify which SerDes to use.
* Input and output properties when passing data to a script.
* For example, in Hive this would specify which SerDes to use.
*/
trait ScriptInputOutputSchema
case class ScriptInputOutputSchema(
inputRowFormat: Seq[(String, String)],
outputRowFormat: Seq[(String, String)],
inputSerdeClass: Option[String],
outputSerdeClass: Option[String],
inputSerdeProps: Seq[(String, String)],
outputSerdeProps: Seq[(String, String)],
recordReaderClass: Option[String],
recordWriterClass: Option[String],
schemaLess: Boolean) {

def inputRowFormatSQL: Option[String] =
getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)

def outputRowFormatSQL: Option[String] =
getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)

/**
* Get the row format specification
* Note:
* 1. Changes are needed when readerClause and writerClause are supported.
* 2. Changes are needed when "ESCAPED BY" is supported.
*/
private def getRowFormatSQL(
rowFormat: Seq[(String, String)],
serdeClass: Option[String],
serdeProps: Seq[(String, String)]): Option[String] = {
if (schemaLess) return Some("")

val rowFormatDelimited =
rowFormat.map {
case ("TOK_TABLEROWFORMATFIELD", value) =>
"FIELDS TERMINATED BY " + value
case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
"COLLECTION ITEMS TERMINATED BY " + value
case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
"MAP KEYS TERMINATED BY " + value
case ("TOK_TABLEROWFORMATLINES", value) =>
"LINES TERMINATED BY " + value
case ("TOK_TABLEROWFORMATNULL", value) =>
"NULL DEFINED AS " + value
case o => return None
}

val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
val serdePropsSQL =
if (serdeClass.nonEmpty) {
val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
} else {
""
}
if (rowFormat.nonEmpty) {
Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
} else {
Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import scala.collection.JavaConverters._
import scala.util.Try

import org.antlr.v4.runtime.{ParserRuleContext, Token}

Expand All @@ -26,16 +27,27 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, DescribeCommand => _, _}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}


/**
* Concrete parser for Spark SQL statements.
*/
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
val astBuilder = new SparkSqlAstBuilder(conf)

private val substitutor = new VariableSubstitution(conf)

protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
super.parse(substitutor.substitute(command))(toResult)
}

protected override def nativeCommand(sqlText: String): LogicalPlan = {
HiveNativeCommand(substitutor.substitute(sqlText))
}
}

/**
Expand All @@ -44,6 +56,14 @@ class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser{
class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
import org.apache.spark.sql.catalyst.parser.ParserUtils._

/**
* Pass a command to Hive using a [[HiveNativeCommand]].
*/
override def visitExecuteNativeCommand(
ctx: ExecuteNativeCommandContext): LogicalPlan = withOrigin(ctx) {
HiveNativeCommand(command(ctx))
}

/**
* Create a [[SetCommand]] logical plan.
*
Expand Down Expand Up @@ -1127,4 +1147,73 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(col.STRING).map(string))
}
}

/**
* Create a [[ScriptInputOutputSchema]].
*/
override protected def withScriptIOSchema(
ctx: QuerySpecificationContext,
inRowFormat: RowFormatContext,
recordWriter: Token,
outRowFormat: RowFormatContext,
recordReader: Token,
schemaLess: Boolean): ScriptInputOutputSchema = {
if (recordWriter != null || recordReader != null) {
throw new ParseException(
"Unsupported operation: Used defined record reader/writer classes.", ctx)
}

// Decode and input/output format.
type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
def format(fmt: RowFormatContext, configKey: String): Format = fmt match {
case c: RowFormatDelimitedContext =>
// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
// expects a seq of pairs in which the old parsers' token names are used as keys.
// Transforming the result of visitRowFormatDelimited would be quite a bit messier than
// retrieving the key value pairs ourselves.
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).map(t => key -> t.getText).toSeq
}
val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++
entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++
entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++
entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++
entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs)

(entries, None, Seq.empty, None)

case c: RowFormatSerdeContext =>
// Use a serde format.
val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)

// SPARK-10310: Special cases LazySimpleSerDe
val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
Try(conf.getConfString(configKey)).toOption
} else {
None
}
(Seq.empty, Option(name), props.toSeq, recordHandler)

case null =>
// Use default (serde) format.
val name = conf.getConfString("hive.script.serde",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
val props = Seq("field.delim" -> "\t")
val recordHandler = Try(conf.getConfString(configKey)).toOption
(Nil, Option(name), props, recordHandler)
}

val (inFormat, inSerdeClass, inSerdeProps, reader) =
format(inRowFormat, "hive.script.recordreader")

val (outFormat, outSerdeClass, outSerdeProps, writer) =
format(outRowFormat, "hive.script.recordwriter")

ScriptInputOutputSchema(
inFormat, outFormat,
inSerdeClass, outSerdeClass,
inSerdeProps, outSerdeProps,
reader, writer,
schemaLess)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution
package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.types.StringType

private[hive]
/**
* A command that we delegate to Hive. Eventually we should remove this.
*/
case class HiveNativeCommand(sql: String) extends RunnableCommand {

override def output: Seq[AttributeReference] =
Seq(AttributeReference("result", StringType, nullable = false)())

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.sessionState.asInstanceOf[HiveSessionState].runNativeSql(sql).map(Row(_))
sqlContext.sessionState.runNativeSql(sql).map(Row(_))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private[sql] class SessionState(ctx: SQLContext) {
}

def runNativeSql(sql: String): Seq[String] = {
throw new UnsupportedOperationException
throw new AnalysisException("Unsupported query: " + sql)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -611,26 +611,6 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}

test("unsupported operations") {
intercept[ParseException] {
parser.parsePlan("DROP TABLE tab PURGE")
}
intercept[ParseException] {
parser.parsePlan("DROP TABLE tab FOR REPLICATION('eventid')")
}
intercept[ParseException] {
parser.parsePlan(
"""
|CREATE EXTERNAL TABLE oneToTenDef
|USING org.apache.spark.sql.sources
|OPTIONS (from '1', to '10')
""".stripMargin)
}
intercept[ParseException] {
parser.parsePlan("SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) FROM testData")
}
}

test("SPARK-14383: DISTRIBUTE and UNSET as non-keywords") {
val sql = "SELECT distribute, unset FROM x"
val parsed = parser.parsePlan(sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand}
import org.apache.spark.sql.execution.command.{CreateTableAsSelectLogicalPlan, CreateViewAsSelectLogicalCommand, HiveNativeCommand}
import org.apache.spark.sql.execution.datasources.{Partition => _, _}
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand}
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ private[hive] trait HiveStrategies {

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, schema: HiveScriptIOSchema) =>
ScriptTransformation(input, script, output, planLater(child), schema)(hiveconf) :: Nil
case logical.ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}

/**
Expand Down Expand Up @@ -210,13 +209,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
}

private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
val inputRowFormatSQL = plan.ioschema.inputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
s"unsupported row format ${ioSchema.inputRowFormat}"))
val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
s"unsupported row format ${plan.ioschema.inputRowFormat}"))
val outputRowFormatSQL = plan.ioschema.outputRowFormatSQL.getOrElse(
throw new UnsupportedOperationException(
s"unsupported row format ${ioSchema.outputRowFormat}"))
s"unsupported row format ${plan.ioschema.outputRowFormat}"))

val outputSchema = plan.output.map { attr =>
s"${attr.sql} ${attr.dataType.simpleString}"
Expand Down
Loading