Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
protected val LAZY = Keyword("LAZY")
protected val SET = Keyword("SET")
protected val TABLE = Keyword("TABLE")
protected val SOURCE = Keyword("SOURCE")
protected val UNCACHE = Keyword("UNCACHE")

protected implicit def asParser(k: Keyword): Parser[String] =
Expand All @@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr

override val lexical = new SqlLexical(reservedWords)

override protected lazy val start: Parser[LogicalPlan] =
cache | uncache | set | shell | source | others
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others

private lazy val cache: Parser[LogicalPlan] =
CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
Expand All @@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
case input => SetCommandParser(input)
}

private lazy val shell: Parser[LogicalPlan] =
"!" ~> restInput ^^ {
case input => ShellCommand(input.trim)
}

private lazy val source: Parser[LogicalPlan] =
SOURCE ~> restInput ^^ {
case input => SourceCommand(input.trim)
}

private lazy val others: Parser[LogicalPlan] =
wholeInput ^^ {
case input => fallback(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.types.StringType

/**
Expand All @@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command {
/**
* Commands of the form "SET [key [= value] ]".
*/
case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
override def output = Seq(
AttributeReference("DFS output", StringType, nullable = false)())
}

/**
*
* Commands of the form "SET [key [= value] ]".
*/
case class SetCommand(kv: Option[(String, Option[String])]) extends Command {
override def output = Seq(
AttributeReference("", StringType, nullable = false)())
Expand Down Expand Up @@ -81,14 +90,3 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}

/**
* Returned for the "! shellCommand" command
*/
case class ShellCommand(cmd: String) extends Command


/**
* Returned for the "SOURCE file" command
*/
case class SourceCommand(filePath: String) extends Command
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ private[hive] trait HiveStrategies {

case hive.AddJar(path) => execution.AddJar(path) :: Nil

case hive.AddFile(path) => execution.AddFile(path) :: Nil

case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with Command {
Seq.empty[Row]
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class AddFile(path: String) extends LeafNode with Command {
def hiveContext = sqlContext.asInstanceOf[HiveContext]

override def output = Seq.empty

override protected lazy val sideEffectResult: Seq[Row] = {
hiveContext.runSqlHive(s"ADD FILE $path")
hiveContext.sparkContext.addFile(path)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.hive.execution

import java.io.File

import scala.util.Try

import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.SparkException
import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
Expand Down Expand Up @@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest {
|WITH serdeproperties('s1'='9')
""".stripMargin)
}
// Now only verify 0.12.0, and ignore other versions due to binary compatability
// Now only verify 0.12.0, and ignore other versions due to binary compatibility
// current TestSerDe.jar is from 0.12.0
if (HiveShim.version == "0.12.0") {
sql(s"ADD JAR $testJar")
Expand All @@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest {
sql("DROP TABLE alter1")
}

test("ADD FILE command") {
val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile
sql(s"ADD FILE $testFile")

val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { _ =>
Iterator.single(new File(SparkFiles.get("v1.txt")).canRead)
}

assert(checkAddFileRDD.first())
}

case class LogEntry(filename: String, message: String)
case class LogFile(name: String)

Expand Down Expand Up @@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest {

createQueryTest("select from thrift based table",
"SELECT * from src_thrift")

// Put tests that depend on specific Hive settings before these last two test,
// since they modify /clear stuff.
}
Expand Down