Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected implicit def asParser(k: Keyword): Parser[String] =
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected val ABS = Keyword("ABS")
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AVG = Keyword("AVG")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
Expand All @@ -80,9 +81,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val COUNT = Keyword("COUNT")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FIRST = Keyword("FIRST")
protected val LAST = Keyword("LAST")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
Expand All @@ -91,42 +92,42 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LAST = Keyword("LAST")
protected val LAZY = Keyword("LAZY")
protected val LEFT = Keyword("LEFT")
protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
protected val LOWER = Keyword("LOWER")
protected val MAX = Keyword("MAX")
protected val MIN = Keyword("MIN")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val LIKE = Keyword("LIKE")
protected val RLIKE = Keyword("RLIKE")
protected val UPPER = Keyword("UPPER")
protected val LOWER = Keyword("LOWER")
protected val REGEXP = Keyword("REGEXP")
protected val ORDER = Keyword("ORDER")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
protected val SQRT = Keyword("SQRT")
protected val STRING = Keyword("STRING")
protected val SUBSTR = Keyword("SUBSTR")
protected val SUBSTRING = Keyword("SUBSTRING")
protected val SUM = Keyword("SUM")
protected val TABLE = Keyword("TABLE")
protected val TIMESTAMP = Keyword("TIMESTAMP")
protected val TRUE = Keyword("TRUE")
protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
protected val UPPER = Keyword("UPPER")
protected val WHERE = Keyword("WHERE")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added keyword LAZY and sorted all the keywords in alphabetical order here. This list was once sorted but broken later.

protected val INTERSECT = Keyword("INTERSECT")
protected val EXCEPT = Keyword("EXCEPT")
protected val SUBSTR = Keyword("SUBSTR")
protected val SUBSTRING = Keyword("SUBSTRING")
protected val SQRT = Keyword("SQRT")
protected val ABS = Keyword("ABS")

// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
Expand Down Expand Up @@ -183,17 +184,15 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
}

protected lazy val cache: Parser[LogicalPlan] =
CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ {
case tableName ~ None =>
CacheCommand(tableName, true)
case tableName ~ Some(plan) =>
CacheTableAsSelectCommand(tableName, plan)
CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> select) <~ opt(";") ^^ {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor styling question, do we prefer this style or the following one:

CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> select).? <~ ";".? ^^ {
  ...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using the opt style, but only because I didn't know about the other one.

Also we can probably omit the ; handling. It is not supported by any of the other parsers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All optional trailing ; occurred in CACHE, UNCACHE, SELECT and INSERT have been removed in #2698.

case isLazy ~ tableName ~ plan =>
CacheTableCommand(tableName, plan, isLazy.isDefined)
}

protected lazy val unCache: Parser[LogicalPlan] =
UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
case tableName => CacheCommand(tableName, false)
}
case tableName => UncacheTableCommand(tableName)
}

protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")

Expand Down Expand Up @@ -283,7 +282,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThanOrEqual(e1, e2) } |
termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } |
termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } |
termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ {
termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ {
case e ~ _ ~ el ~ _ ~ eu => And(GreaterThanOrEqual(e, el), LessThanOrEqual(e, eu))
} |
termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, e2) } |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
tableName: String,
alias: Option[String] = None): LogicalPlan = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
val table = tables.get(tblName).getOrElse(sys.error(s"Table Not Found: $tableName"))
val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
val tableWithQualifiers = Subquery(tblName, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends
}

/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
* Returned for the "CACHE TABLE tableName [AS SELECT ...]" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command
case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean)
extends Command

/**
* Returned for the "UNCACHE TABLE tableName" command.
*/
case class UncacheTableCommand(tableName: String) extends Command

/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
Expand All @@ -75,8 +81,3 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}

/**
* Returned for the "CACHE TABLE tableName AS SELECT .." command.
*/
case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,10 @@ private[sql] trait CacheManager {
}

/** Removes the data for the given SchemaRDD from the cache */
private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock {
private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus Forgot to confirm this with you: default value of the blocking argument is true in RDD.unpersist(), I changed the default value here to keep the semantics consistent. This also makes testing more easily (I added assertions to check RDD materialization, non-blocking unpersisting introduces some subtleties). Did you intend to use non-blocking unpersisting here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mistakenly though that was the default. We should match the original semantics.

val planToCache = query.queryExecution.optimizedPlan
val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))

if (dataIndex < 0) {
throw new IllegalArgumentException(s"Table $query is not cached.")
}

require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
cachedData.remove(dataIndex)
}
Expand Down Expand Up @@ -135,5 +131,4 @@ private[sql] trait CacheManager {
case _ =>
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation(

override def newInstance() = {
new InMemoryRelation(
output.map(_.newInstance),
output.map(_.newInstance()),
useCompression,
batchSize,
storageLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(logicalPlan, extended) =>
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommand(tableName, cache)(context))
case logical.CacheTableAsSelectCommand(tableName, plan) =>
Seq(execution.CacheTableAsSelectCommand(tableName, plan))
case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
case logical.UncacheTableCommand(tableName) =>
Seq(execution.UncacheTableCommand(tableName))
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,49 +138,54 @@ case class ExplainCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
case class CacheTableCommand(
tableName: String,
plan: Option[LogicalPlan],
isLazy: Boolean)
extends LeafNode with Command {

override protected lazy val sideEffectResult = {
if (doCache) {
context.cacheTable(tableName)
} else {
context.uncacheTable(tableName)
import sqlContext._

plan.foreach(_.registerTempTable(tableName))
val schemaRDD = table(tableName)
schemaRDD.cache()

if (!isLazy) {
// Performs eager caching
schemaRDD.count()
}

Seq.empty[Row]
}

override def output: Seq[Attribute] = Seq.empty
}


/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
override protected lazy val sideEffectResult: Seq[Row] = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
sqlContext.table(tableName).unpersist()
Seq.empty[Row]
}

override def output: Seq[Attribute] = Seq.empty
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan)
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult = {
import sqlContext._
logicalPlan.registerTempTable(tableName)
cacheTable(tableName)
Seq.empty[Row]
}

override def output: Seq[Attribute] = Seq.empty

override protected lazy val sideEffectResult: Seq[Row] = {
Row("# Registered as a temporary table", null, null) +:
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
Loading