Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ object ScalaReflection extends ScalaReflection {
}

/**
* Given a type `T` this function constructs and ObjectType that holds a class of type
* Array[T]. Special handling is performed for primitive types to map them back to their raw
* Given a type `T` this function constructs `ObjectType` that holds a class of type
* `Array[T]`.
*
* Special handling is performed for primitive types to map them back to their raw
* JVM form instead of the Scala Array that handles auto boxing.
*/
private def arrayClassFor(tpe: `Type`): ObjectType = ScalaReflectionLock.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ object AnalysisContext {

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a
* [[SessionCatalog]] and a [[FunctionRegistry]].
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(
catalog: SessionCatalog,
Expand Down Expand Up @@ -1910,7 +1909,7 @@ class Analyzer(
* `[Sum(_w0) OVER (PARTITION BY _w1 ORDER BY _w2)]` and the second returned value will be
* [col1, col2 + col3 as _w0, col4 as _w1, col5 as _w2].
*
* @return (seq of expressions containing at lease one window expressions,
* @return (seq of expressions containing at least one window expression,
* seq of non-window expressions)
*/
private def extract(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ object ExpressionEncoder {
}

/**
* A generic encoder for JVM objects.
* A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer`
* and a `deserializer`.
*
* @param schema The schema after converting `T` to a Spark SQL row.
* @param serializer A set of expressions, one for each top-level field that can be used to
Expand All @@ -235,7 +236,7 @@ case class ExpressionEncoder[T](
assert(serializer.flatMap { ser =>
val boundRefs = ser.collect { case b: BoundReference => b }
assert(boundRefs.nonEmpty,
"each serializer expression should contains at least one `BoundReference`")
"each serializer expression should contain at least one `BoundReference`")
boundRefs
}.distinct.length <= 1, "all serializer expressions must use the same BoundReference.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ class CodegenContext {

/**
* Generates code for expressions. If doSubexpressionElimination is true, subexpression
* elimination will be performed. Subexpression elimination assumes that the code will for each
* elimination will be performed. Subexpression elimination assumes that the code for each
* expression will be combined in the `expressions` order.
*/
def generateExpressions(expressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import org.apache.spark.sql.types._
/**
* Generates a [[Projection]] that returns an [[UnsafeRow]].
*
* It generates the code for all the expressions, compute the total length for all the columns
* (can be accessed via variables), and then copy the data into a scratch buffer space in the
* It generates the code for all the expressions, computes the total length for all the columns
* (can be accessed via variables), and then copies the data into a scratch buffer space in the
* form of UnsafeRow (the scratch buffer will grow as needed).
*
* Note: The returned UnsafeRow will be pointed to a scratch buffer inside the projection.
* @note The returned UnsafeRow will be pointed to a scratch buffer inside the projection.
*/
object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafeProjection] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ sealed trait FrameType
* or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered
* as a physical offset.
* For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame,
* from the row precedes the current row to the row follows the current row.
* from the row that precedes the current row to the row that follows the current row.
*/
case object RowFrame extends FrameType

Expand All @@ -126,7 +126,7 @@ case object RowFrame extends FrameType
* `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame containing rows whose values
* `expr` are in the range of [v-1, v+1].
*
* If `ORDER BY` clause is not defined, all rows in the partition is considered as peers
* If `ORDER BY` clause is not defined, all rows in the partition are considered as peers
* of the current row.
*/
case object RangeFrame extends FrameType
Expand Down Expand Up @@ -217,11 +217,11 @@ case object UnboundedFollowing extends FrameBoundary {
}

/**
* The trait used to represent the a Window Frame.
* Represents a window frame.
*/
sealed trait WindowFrame

/** Used as a place holder when a frame specification is not defined. */
/** Used as a placeholder when a frame specification is not defined. */
case object UnspecifiedFrame extends WindowFrame

/** A specified Window Frame. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode

/**
* Given a [[LogicalPlan]], returns a list of `PhysicalPlan`s that can
* be used for execution. If this strategy does not apply to the give logical operation then an
* be used for execution. If this strategy does not apply to the given logical operation then an
* empty list should be returned.
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
Expand All @@ -42,9 +42,10 @@ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends L
* Abstract class for transforming [[LogicalPlan]]s into physical plans.
* Child classes are responsible for specifying a list of [[GenericStrategy]] objects that
* each of which can return a list of possible physical plan options.
* If a given strategy is unable to plan all
* of the remaining operators in the tree, it can call [[planLater]], which returns a placeholder
* object that will be filled in using other available strategies.
* If a given strategy is unable to plan all of the remaining operators in the tree,
* it can call [[GenericStrategy#planLater planLater]], which returns a placeholder
* object that will be [[collectPlaceholders collected]] and filled in
* using other available strategies.
*
* TODO: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER...
* PLAN SPACE EXPLORATION WILL BE IMPLEMENTED LATER.
Expand Down Expand Up @@ -93,7 +94,10 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
pruned
}

/** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */
/**
* Collects placeholders marked using [[GenericStrategy#planLater planLater]]
* by [[strategies]].
*/
protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)]

/** Prunes bad plans to prevent combinatorial explosion. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
protected def innerChildren: Seq[TreeNode[_]] = Seq.empty

/**
* Appends the string represent of this node and its children to the given StringBuilder.
* Appends the string representation of this node and its children to the given StringBuilder.
*
* The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
* depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ class Column(val expr: Expression) extends Logging {
def bitwiseXOR(other: Any): Column = withExpr { BitwiseXor(expr, lit(other).expr) }

/**
* Define a windowing column.
* Defines a windowing column.
*
* {{{
* val w = Window.partitionBy("name").orderBy("id")
Expand All @@ -1168,7 +1168,7 @@ class Column(val expr: Expression) extends Logging {
def over(window: expressions.WindowSpec): Column = window.withAggregate(this)

/**
* Define a empty analytic clause. In this case the analytic function is applied
* Defines an empty analytic clause. In this case the analytic function is applied
* and presented for all rows in the result set.
*
* {{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import org.apache.spark.sql.types.NumericType
import org.apache.spark.sql.types.StructType

/**
* A set of methods for aggregations on a `DataFrame`, created by `Dataset.groupBy`.
* A set of methods for aggregations on a `DataFrame`, created by [[Dataset#groupBy groupBy]],
* [[Dataset#cube cube]] or [[Dataset#rollup rollup]] (and also `pivot`).
*
* The main method is the agg function, which has multiple variants. This class also contains
* convenience some first order statistics such as mean, sum for convenience.
* The main method is the `agg` function, which has multiple variants. This class also contains
* some first-order statistics such as `mean`, `sum` for convenience.
*
* This class was named `GroupedData` in Spark 1.x.
* @note This class was named `GroupedData` in Spark 1.x.
*
* @since 2.0.0
*/
Expand Down Expand Up @@ -297,8 +298,9 @@ class RelationalGroupedDataset protected[sql](
}

/**
* Pivots a column of the current `DataFrame` and perform the specified aggregation.
* There are two versions of pivot function: one that requires the caller to specify the list
* Pivots a column of the current `DataFrame` and performs the specified aggregation.
*
* There are two versions of `pivot` function: one that requires the caller to specify the list
* of distinct values to pivot on, and one that does not. The latter is more concise but less
* efficient, because Spark needs to first compute the list of distinct values internally.
*
Expand Down Expand Up @@ -337,7 +339,7 @@ class RelationalGroupedDataset protected[sql](
}

/**
* Pivots a column of the current `DataFrame` and perform the specified aggregation.
* Pivots a column of the current `DataFrame` and performs the specified aggregation.
* There are two versions of pivot function: one that requires the caller to specify the list
* of distinct values to pivot on, and one that does not. The latter is more concise but less
* efficient, because Spark needs to first compute the list of distinct values internally.
Expand Down Expand Up @@ -369,7 +371,9 @@ class RelationalGroupedDataset protected[sql](
}

/**
* Pivots a column of the current `DataFrame` and perform the specified aggregation.
* (Java-specific) Pivots a column of the current `DataFrame` and performs the specified
* aggregation.
*
* There are two versions of pivot function: one that requires the caller to specify the list
* of distinct values to pivot on, and one that does not. The latter is more concise but less
* efficient, because Spark needs to first compute the list of distinct values internally.
Expand Down Expand Up @@ -433,10 +437,6 @@ class RelationalGroupedDataset protected[sql](
}
}


/**
* Companion object for GroupedData.
*/
private[sql] object RelationalGroupedDataset {

def apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}

/**
* Return all metadata that describes more details of this SparkPlan.
* @return Metadata that describes more details of this SparkPlan.
*/
def metadata: Map[String, String] = Map.empty

/**
* Return all metrics containing metrics of this SparkPlan.
* @return All metrics containing metrics of this SparkPlan.
*/
def metrics: Map[String, SQLMetric] = Map.empty

/**
* Reset all the metrics.
* Resets all the metrics.
*/
def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
}

/**
* Return a LongSQLMetric according to the name.
* @return [[SQLMetric]] for the `name`.
*/
def longMetric(name: String): SQLMetric = metrics(name)

Expand Down Expand Up @@ -128,7 +128,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}

/**
* Execute a query after preparing the query and adding query plan information to created RDDs
* Executes a query after preparing the query and adding query plan information to created RDDs
* for visualization.
*/
protected final def executeQuery[T](query: => T): T = {
Expand Down Expand Up @@ -176,7 +176,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private var prepared = false

/**
* Prepare a SparkPlan for execution. It's idempotent.
* Prepares this SparkPlan for execution. It's idempotent.
*/
final def prepare(): Unit = {
// doPrepare() may depend on it's children, we should call prepare() on all the children first.
Expand All @@ -195,22 +195,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* `execute` of SparkPlan. This is helpful if we want to set up some state before executing the
* query, e.g., `BroadcastHashJoin` uses it to broadcast asynchronously.
*
* Note: the prepare method has already walked down the tree, so the implementation doesn't need
* to call children's prepare methods.
* @note `prepare` method has already walked down the tree, so the implementation doesn't have
* to call children's `prepare` methods.
*
* This will only be called once, protected by `this`.
*/
protected def doPrepare(): Unit = {}

/**
* Produces the result of the query as an `RDD[InternalRow]`
*
* Overridden by concrete implementations of SparkPlan.
* Produces the result of the query as an RDD[InternalRow]
*/
protected def doExecute(): RDD[InternalRow]

/**
* Overridden by concrete implementations of SparkPlan.
* Produces the result of the query as a broadcast variable.
*
* Overridden by concrete implementations of SparkPlan.
*/
protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
throw new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast")
Expand Down Expand Up @@ -245,7 +247,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}

/**
* Decode the byte arrays back to UnsafeRows and put them into buffer.
* Decodes the byte arrays back to UnsafeRows and put them into buffer.
*/
private def decodeUnsafeRows(bytes: Array[Byte]): Iterator[InternalRow] = {
val nFields = schema.length
Expand Down Expand Up @@ -284,7 +286,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an iterator of InternalRow.
*
* Note: this will trigger multiple jobs (one for each partition).
* @note Triggers multiple jobs (one for each partition).
*/
def executeToIterator(): Iterator[InternalRow] = {
getByteArrayRdd().toLocalIterator.flatMap(decodeUnsafeRows)
Expand All @@ -301,7 +303,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the first `n` rows as an array.
*
* This is modeled after RDD.take but never runs any job locally on the driver.
* This is modeled after `RDD.take` but never runs any job locally on the driver.
*/
def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ trait CodegenSupport extends SparkPlan {
/**
* Returns all the RDDs of InternalRow which generates the input rows.
*
* Note: right now we support up to two RDDs.
* @note Right now we support up to two RDDs
*/
def inputRDDs(): Seq[RDD[InternalRow]]

Expand Down Expand Up @@ -227,7 +227,7 @@ trait CodegenSupport extends SparkPlan {


/**
* InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
* InputAdapter is used to hide a SparkPlan from a subtree that supports codegen.
*
* This is the leaf node of a tree with WholeStageCodegen that is used to generate code
* that consumes an RDD iterator of InternalRow.
Expand Down Expand Up @@ -282,10 +282,10 @@ object WholeStageCodegenExec {
}

/**
* WholeStageCodegen compile a subtree of plans that support codegen together into single Java
* WholeStageCodegen compiles a subtree of plans that support codegen together into single Java
* function.
*
* Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
* Here is the call graph of to generate Java source (plan A supports codegen, but plan B does not):
*
* WholeStageCodegen Plan A FakeInput Plan B
* =========================================================================
Expand All @@ -304,10 +304,10 @@ object WholeStageCodegenExec {
* |
* doConsume() <-------- consume()
*
* SparkPlan A should override doProduce() and doConsume().
* SparkPlan A should override `doProduce()` and `doConsume()`.
*
* doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
* used to generated code for BoundReference.
* `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input,
* used to generated code for [[BoundReference]].
*/
case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {

Expand Down
Loading