Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ object SparkBuild extends Build {
publish := {},

unidocProjectFilter in (ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(repl, examples, tools, yarn, yarnAlpha),
inAnyProject -- inProjects(repl, examples, tools, catalyst, yarn, yarnAlpha),
unidocProjectFilter in (JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(repl, examples, bagel, graphx, catalyst, tools, yarn, yarnAlpha),

Expand Down Expand Up @@ -457,7 +457,7 @@ object SparkBuild extends Build {
def catalystSettings = sharedSettings ++ Seq(
name := "catalyst",
// The mechanics of rewriting expression ids to compare trees in some test cases makes
// assumptions about the the expression ids being contiguious. Running tests in parallel breaks
// assumptions about the the expression ids being contiguous. Running tests in parallel breaks
// this non-deterministically. TODO: FIX THIS.
parallelExecution in Test := false,
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
/**
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
*/
trait SchemaRDDLike {
private[sql] trait SchemaRDDLike {
@transient val sqlContext: SQLContext
@transient protected[spark] val logicalPlan: LogicalPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution

import java.util.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._

/**
* :: DeveloperApi ::
* Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each
* group.
*
Expand All @@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
* @param aggregateExpressions expressions that are computed for each group.
* @param child the input data source.
*/
@DeveloperApi
case class Aggregate(
partial: Boolean,
groupingExpressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.Row
Expand All @@ -26,6 +27,10 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {

override def outputPartitioning = newPartitioning
Expand Down Expand Up @@ -81,7 +86,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
* [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting
* [[Exchange]] Operators where required.
*/
object AddExchange extends Rule[SparkPlan] {
private[sql] object AddExchange extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
val numPartitions = 150

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection}

/**
* :: DeveloperApi ::
* Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
* programming with one important additional feature, which allows the input rows to be joined with
Expand All @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal,
* @param outer when true, each input row will be output at least once, even if the output of the
* given `generator` is empty. `outer` has no effect when `join` is false.
*/
@DeveloperApi
case class Generate(
generator: Generator,
join: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Logging, Row}
import org.apache.spark.sql.catalyst.trees
Expand All @@ -26,6 +27,10 @@ import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

/**
* :: DeveloperApi ::
*/
@DeveloperApi
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
self: Product =>

Expand All @@ -51,6 +56,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
}

/**
* :: DeveloperApi ::
* Allows already planned SparkQueries to be linked into logical query plans.
*
* Note that in general it is not valid to use this class to link multiple copies of the same
Expand All @@ -59,6 +65,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
* replace the output attributes with new copies of themselves without breaking any attribute
* linking.
*/
@DeveloperApi
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
extends logical.LogicalPlan with MultiInstanceRelation {

Expand All @@ -77,15 +84,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
}
}

trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
self: Product =>
}

trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] {
private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] {
self: Product =>
override def outputPartitioning: Partitioning = child.outputPartitioning
}

trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] {
private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] {
self: Product =>
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair
import org.apache.spark.util.Utils

class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
val kryo = new Kryo()
kryo.setRegistrationRequired(false)
Expand All @@ -50,7 +50,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
}
}

object SparkSqlSerializer {
private[sql] object SparkSqlSerializer {
// TODO (lian) Using KryoSerializer here is workaround, needs further investigation
// Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
// related error.
Expand All @@ -68,7 +68,7 @@ object SparkSqlSerializer {
}
}

class BigDecimalSerializer extends Serializer[BigDecimal] {
private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {
def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
// TODO: There are probably more efficient representations than strings...
output.writeString(bd.toString())
Expand All @@ -83,7 +83,7 @@ class BigDecimalSerializer extends Serializer[BigDecimal] {
* Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
* them as `Array[(k,v)]`.
*/
class MapSerializer extends Serializer[Map[_,_]] {
private[sql] class MapSerializer extends Serializer[Map[_,_]] {
def write(kryo: Kryo, output: Output, map: Map[_,_]) {
kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.parquet._

abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>

object HashJoin extends Strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.sql.catalyst.ScalaReflection
Expand All @@ -27,6 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override def output = projectList.map(_.toAttribute)

Expand All @@ -36,6 +41,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
override def output = child.output

Expand All @@ -44,6 +53,10 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: SparkPlan)
extends UnaryNode {

Expand All @@ -53,6 +66,10 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child:
override def execute() = child.execute().sample(withReplacement, fraction, seed)
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output = children.head.output
Expand All @@ -62,12 +79,14 @@ case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends
}

/**
* :: DeveloperApi ::
* Take the first limit elements. Note that the implementation is different depending on whether
* this is a terminal operator or not. If it is terminal and is invoked using executeCollect,
* this operator uses Spark's take method on the Spark driver. If it is not terminal or is
* invoked using execute, we first take the limit on each partition, and then repartition all the
* data to a single partition to compute the global limit.
*/
@DeveloperApi
case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode {
// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
// partition local limit -> exchange into one partition -> partition local limit again
Expand All @@ -91,10 +110,12 @@ case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) exte
}

/**
* :: DeveloperApi ::
* Take the first limit elements as defined by the sortOrder. This is logically equivalent to
* having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but
* Spark's top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.
*/
@DeveloperApi
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sc: SparkContext) extends UnaryNode {
override def otherCopyArgs = sc :: Nil
Expand All @@ -111,7 +132,10 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
override def execute() = sc.makeRDD(executeCollect(), 1)
}


/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
Expand All @@ -134,6 +158,10 @@ case class Sort(
override def output = child.output
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
object ExistingRdd {
def convertToCatalyst(a: Any): Any = a match {
case s: Seq[Any] => s.map(convertToCatalyst)
Expand Down Expand Up @@ -167,6 +195,10 @@ object ExistingRdd {
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute() = rdd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

object DebugQuery {
private[sql] object DebugQuery {
def apply(plan: SparkPlan): SparkPlan = {
val visited = new collection.mutable.HashSet[Long]()
plan transform {
Expand All @@ -28,7 +28,7 @@ object DebugQuery {
}
}

case class DebugNode(child: SparkPlan) extends UnaryNode {
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
def references = Set.empty
def output = child.output
def execute() = {
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@ import scala.collection.mutable.{ArrayBuffer, BitSet}

import org.apache.spark.SparkContext

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning}

@DeveloperApi
sealed abstract class BuildSide

@DeveloperApi
case object BuildLeft extends BuildSide

@DeveloperApi
case object BuildRight extends BuildSide

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class HashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
Expand Down Expand Up @@ -130,6 +140,10 @@ case class HashJoin(
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
def output = left.output ++ right.output

Expand All @@ -138,6 +152,10 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class BroadcastNestedLoopJoin(
streamed: SparkPlan, broadcast: SparkPlan, joinType: JoinType, condition: Option[Expression])
(@transient sc: SparkContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql

/**
* :: DeveloperApi ::
* An execution engine for relational query plans that runs on top Spark and returns RDDs.
*
* Note that the operators in this package are created automatically by a query planner using a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.util.Utils

object ParquetTestData {
private[sql] object ParquetTestData {

val testSchema =
"""message myrecord {
Expand Down
Loading