Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected long getUsed() {

/**
* Force spill during building.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not "for testing only". I'd even say that it's more often used in a non-test code than test code. That made the comment no longer correct. See ShuffleExternalSorter and UnsafeShuffleWriter which both are ShuffleWriter instances.

* For testing.
*/
public void spill() throws IOException {
spill(Long.MAX_VALUE, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,15 @@
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
* writes incoming records to separate files, one file per reduce partition, then concatenates these
* per-partition files to form a single output file, regions of which are served to reducers.
* Records are not buffered in memory. This is essentially identical to
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
* Records are not buffered in memory. It writes output in a format
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
* <p>
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
* simultaneously opens separate serializers and file streams for all partitions. As a result,
* {@link SortShuffleManager} only selects this write path when
* <ul>
* <li>no Ordering is specified,</li>
* <li>no Aggregator is specific, and</li>
* <li>no Aggregator is specified, and</li>
* <li>the number of partitions is less than
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ private[spark] trait ExecutorAllocationClient {

/**
* Request that the cluster manager kill every executor on the specified host.
* Results in a call to killExecutors for each executor on the host, with the replace
* and force arguments set to true.
*
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutorsOnHost(host: String): Boolean

/**
/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private[spark] abstract class Task[T](

def preferredLocations: Seq[TaskLocation] = Nil

// Map output tracker epoch. Will be set by TaskScheduler.
// Map output tracker epoch. Will be set by TaskSetManager.
var epoch: Long = -1

// Task context, to be initialized in run().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class Serializer {
* position = 0
* serOut.write(obj1)
* serOut.flush()
* position = # of bytes writen to stream so far
* position = # of bytes written to stream so far
* obj1Bytes = output[0:position-1]
* serOut.write(obj2)
* serOut.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
// Create an ExternalSorter to sort the data.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class IndexShuffleBlockResolver(

/**
* Remove data file and index file that contain the output data from one map.
* */
*/
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
var file = getDataFile(shuffleId, mapId)
if (file.exists()) {
Expand Down Expand Up @@ -132,7 +132,7 @@ private[spark] class IndexShuffleBlockResolver(
* replace them with new ones.
*
* Note: the `lengths` will be updated to match the existing index file if use the existing ones.
* */
*/
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
* Obtains a [[ShuffleHandle]] to pass to tasks.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove some of the docs in instances like this? it's not obvious it was superfluous

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a copy from the main method this one overrides. Since the method does not do what the scaladoc said I thought I'd make it current. It might've "Register(ed) a shuffle with the manager" in the past but not today. It was misleading.

*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[spark] object AccumulatorContext {


/**
* An [[AccumulatorV2 accumulator]] for computing sum, count, and averages for 64-bit integers.
* An [[AccumulatorV2 accumulator]] for computing sum, count, and average of 64-bit integers.
*
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.util.Utils

/**
* An example of how to use [[org.apache.spark.sql.DataFrame]] for ML. Run with
* An example of how to use [[DataFrame]] for ML. Run with
* {{{
* ./bin/run-example ml.DataFrameExample [options]
* }}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin
/**
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
*
* Note that this is separatedly into two rules because in the future we might introduce new hint
* Note that this is separately into two rules because in the future we might introduce new hint
* rules that have different ordering requirements from broadcast.
*/
object ResolveHints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ case class ExpressionEncoder[T](
// serializer expressions are used to encode an object to a row, while the object is usually an
// intermediate value produced inside an operator, not from the output of the child operator. This
// is quite different from normal expressions, and `AttributeReference` doesn't work here
// (intermediate value is not an attribute). We assume that all serializer expressions use a same
// `BoundReference` to refer to the object, and throw exception if they don't.
assert(serializer.forall(_.references.isEmpty), "serializer cannot reference to any attributes.")
// (intermediate value is not an attribute). We assume that all serializer expressions use the
// same `BoundReference` to refer to the object, and throw exception if they don't.
assert(serializer.forall(_.references.isEmpty), "serializer cannot reference any attributes.")
assert(serializer.flatMap { ser =>
val boundRefs = ser.collect { case b: BoundReference => b }
assert(boundRefs.nonEmpty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ abstract class BinaryExpression extends Expression {
* A [[BinaryExpression]] that is an operator, with two properties:
*
* 1. The string representation is "x symbol y", rather than "funcName(x, y)".
* 2. Two inputs are expected to the be same type. If the two inputs have different types,
* 2. Two inputs are expected to be of the same type. If the two inputs have different types,
* the analyzer will find the tightest common type and do the proper type casting.
*/
abstract class BinaryOperator extends BinaryExpression with ExpectsInputTypes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ case class WindowSpecDefinition(
frameSpecification.isInstanceOf[SpecifiedWindowFrame]

override def nullable: Boolean = true
override def foldable: Boolean = false
override def dataType: DataType = throw new UnsupportedOperationException

override def sql: String = {
Expand Down Expand Up @@ -695,7 +694,7 @@ case class DenseRank(children: Seq[Expression]) extends RankLike {
*
* This documentation has been based upon similar documentation for the Hive and Presto projects.
*
* @param children to base the rank on; a change in the value of one the children will trigger a
* @param children to base the rank on; a change in the value of one of the children will trigger a
* change in rank. This is an internal parameter and will be assigned by the
* Analyser.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object EliminateSerialization extends Rule[LogicalPlan] {

/**
* Combines two adjacent [[TypedFilter]]s, which operate on same type object in condition, into one,
* mering the filter functions into one conjunctive function.
* merging the filter functions into one conjunctive function.
*/
object CombineTypedFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

/**
* Add an [[Aggregate]] to a logical plan.
* Add an [[Aggregate]] or [[GroupingSets]] to a logical plan.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably OK, but it's harder to know whether this is correct. I'm also aware that many [[xxx]] links actually fail to render in javadoc 8 -- have you tested the ones you added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't check the javadoc and the change follows Aggregate. I'll see how to generate javadoc and check this out (as I've been meaning to do it anyway for some time). Thanks for inspiration!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The package org.apache.spark.sql.catalyst.parser and hence AstBuilder don't show in javadoc/scaladoc.

*/
private def withAggregation(
ctx: AggregationContext,
Expand All @@ -519,7 +519,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

/**
* Add a Hint to a logical plan.
* Add a [[Hint]] to a logical plan.
*/
private def withHints(
ctx: HintContext,
Expand All @@ -545,7 +545,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

/**
* Create a single relation referenced in a FROM claused. This method is used when a part of the
* Create a single relation referenced in a FROM clause. This method is used when a part of the
* join condition is nested, for example:
* {{{
* select * from t1 join (t2 cross join t3) on col1 = col2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,15 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
def producedAttributes: AttributeSet = AttributeSet.empty

/**
* Attributes that are referenced by expressions but not provided by this nodes children.
* Attributes that are referenced by expressions but not provided by this node's children.
* Subclasses should override this method if they produce attributes internally as it is used by
* assertions designed to prevent the construction of invalid plans.
*/
def missingInput: AttributeSet = references -- inputSet -- producedAttributes

/**
* Runs [[transform]] with `rule` on all expressions present in this query operator.
* Runs [[transformExpressionsDown]] with `rule` on all expressions present
* in this query operator.
* Users should not expect a specific directionality. If a specific directionality is needed,
* transformExpressionsDown or transformExpressionsUp should be used.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
private var _analyzed: Boolean = false

/**
* Marks this plan as already analyzed. This should only be called by CheckAnalysis.
* Marks this plan as already analyzed. This should only be called by [[CheckAnalysis]].
*/
private[catalyst] def setAnalyzed(): Unit = { _analyzed = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

/**
* Test basic expression parsing. If a type of expression is supported it should be tested here.
* Test basic expression parsing.
* If the type of an expression is supported it should be tested here.
*
* Please note that some of the expressions test don't have to be sound expressions, only their
* structure needs to be valid. Unsound expressions should be caught by the Analyzer or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.spark.annotation.InterfaceStability
*
* To use this, import implicit conversions in SQL:
* {{{
* import sqlContext.implicits._
* val spark: SparkSession = ...
* import spark.implicits._
* }}}
*
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.spark.util.Utils
* The builder can also be used to create a new session:
*
* {{{
* SparkSession.builder()
* SparkSession.builder
* .master("local")
* .appName("Word Count")
* .config("spark.some.config.option", "some-value")
Expand Down Expand Up @@ -323,7 +323,7 @@ class SparkSession private(
* // |-- age: integer (nullable = true)
*
* dataFrame.createOrReplaceTempView("people")
* sparkSession.sql("select name from people").collect.foreach(println)
* sparkSession.sql("select name from people").show
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this is an OK modification but it's not really a typo fix. I'd avoid changes that aren't fixing problems

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Just for the record, collect.foreach(println) should not be "endorsed" in Spark 2's official docs as too RDD-ish.

* }}}
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.types.StringType

/**
* A command for users to list the databases/schemas.
* If a databasePattern is supplied then the databases that only matches the
* If a databasePattern is supplied then the databases that only match the
* pattern would be listed.
* The syntax of using this command in SQL is:
* {{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] to see if any more data is available.
*/
trait Source {
trait Source {

/** Returns the schema of the data from this source */
def schema: StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2967,7 +2967,7 @@ object functions {
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* @param options options to control how the json is parsed. Accepts the same options as the
* json data source.
*
* @group collection_funcs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging
new mutable.HashMap[Int, StreamInputInfo]())

if (inputInfos.contains(inputInfo.inputStreamId)) {
throw new IllegalStateException(s"Input stream ${inputInfo.inputStreamId} for batch" +
throw new IllegalStateException(s"Input stream ${inputInfo.inputStreamId} for batch " +
s"$batchTime is already added into InputInfoTracker, this is an illegal state")
}
inputInfos += ((inputInfo.inputStreamId, inputInfo))
Expand Down