Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
a39214b
spelling: adaptive
jsoref Nov 11, 2020
0984351
spelling: aggregated
jsoref Nov 11, 2020
9c55a27
spelling: available
jsoref Nov 11, 2020
ef107be
spelling: batches
jsoref Nov 11, 2020
b1db4b2
spelling: bottom
jsoref Nov 11, 2020
eade260
spelling: calendar
jsoref Nov 11, 2020
a53c5b5
spelling: captures
jsoref Nov 11, 2020
23a0c19
spelling: catalog
jsoref Nov 11, 2020
be4a9d6
spelling: characters
jsoref Nov 11, 2020
d76a947
spelling: checkpoint
jsoref Nov 11, 2020
912a325
spelling: codegened
jsoref Nov 11, 2020
42b0b76
spelling: columnar
jsoref Nov 11, 2020
a314466
spelling: common
jsoref Nov 11, 2020
354198f
spelling: description
jsoref Nov 11, 2020
3769101
spelling: deterministic
jsoref Nov 11, 2020
612fece
spelling: distinct
jsoref Nov 11, 2020
11d3d70
spelling: do not
jsoref Nov 11, 2020
1fa7ac3
spelling: doesn't
jsoref Nov 11, 2020
097946c
spelling: don't
jsoref Nov 11, 2020
48e871e
spelling: e.g.
jsoref Nov 11, 2020
94be7f1
spelling: everything
jsoref Nov 11, 2020
a83fd61
spelling: execution
jsoref Nov 11, 2020
a8ceb06
spelling: explains
jsoref Nov 11, 2020
a71ae42
spelling: february
jsoref Nov 11, 2020
cc71a63
spelling: first
jsoref Nov 11, 2020
bb4da1c
spelling: global
jsoref Nov 11, 2020
44de7dd
spelling: grouped
jsoref Nov 11, 2020
b6c4e18
spelling: identifier
jsoref Nov 11, 2020
26fda2b
spelling: immutable
jsoref Nov 11, 2020
9d17aa4
spelling: location
jsoref Nov 11, 2020
98339fd
spelling: matches
jsoref Nov 11, 2020
74d83cb
spelling: maybe
jsoref Nov 11, 2020
a479fdc
spelling: multiple
jsoref Nov 11, 2020
49fdba9
spelling: namespace
jsoref Nov 11, 2020
f791570
spelling: nonexistent
jsoref Nov 11, 2020
bbaa8e0
spelling: nonexistenttable
jsoref Nov 11, 2020
3ecbf8f
spelling: operator
jsoref Nov 11, 2020
ad59b22
spelling: order
jsoref Nov 11, 2020
642f347
spelling: overridden
jsoref Nov 11, 2020
57d7f21
spelling: partition
jsoref Nov 11, 2020
f57061c
spelling: pattern
jsoref Nov 11, 2020
fc2302f
spelling: percentage
jsoref Nov 11, 2020
1af2ee6
spelling: property
jsoref Nov 11, 2020
d0fff8a
spelling: provider
jsoref Nov 11, 2020
1919d17
spelling: queries
jsoref Nov 11, 2020
756e9c9
spelling: reordered
jsoref Nov 11, 2020
0bd9782
spelling: replace
jsoref Nov 11, 2020
8409f83
spelling: rolled up
jsoref Nov 11, 2020
1a59866
spelling: schema
jsoref Nov 11, 2020
ffd6c61
spelling: separator
jsoref Nov 11, 2020
e50bfb0
spelling: spark
jsoref Nov 11, 2020
5456f86
spelling: stream
jsoref Nov 11, 2020
5478501
spelling: subqueries
jsoref Nov 11, 2020
69ba303
spelling: succeeded
jsoref Nov 11, 2020
1013da5
spelling: temporary
jsoref Nov 11, 2020
43da92e
spelling: the
jsoref Nov 11, 2020
b412923
spelling: triggered
jsoref Nov 11, 2020
78f0ef0
spelling: unexpected
jsoref Nov 11, 2020
da4faf8
spelling: unknown
jsoref Nov 11, 2020
40f97c8
spelling: validate
jsoref Nov 11, 2020
4715490
spelling: warning
jsoref Nov 11, 2020
d3d6324
spelling: whether
jsoref Nov 11, 2020
db599a5
style: line length exceeds 100 characters
jsoref Nov 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ function preprocessGraphLayout(g) {
var node = g.node(nodes[i]);
node.padding = "5";

var firstSearator;
var firstSeparator;
var secondSeparator;
var splitter;
if (node.isCluster) {
firstSearator = secondSeparator = labelSeparator;
firstSeparator = secondSeparator = labelSeparator;
splitter = "\\n";
} else {
firstSearator = "<span class='stageId-and-taskId-metrics'>";
firstSeparator = "<span class='stageId-and-taskId-metrics'>";
secondSeparator = "</span>";
splitter = "<br>";
}
Expand All @@ -104,7 +104,7 @@ function preprocessGraphLayout(g) {
if (newTexts) {
node.label = node.label.replace(
newTexts[0],
newTexts[1] + firstSearator + newTexts[2] + secondSeparator + newTexts[3]);
newTexts[1] + firstSeparator + newTexts[2] + secondSeparator + newTexts[3]);
}
});
}
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1363,7 +1363,7 @@ class Dataset[T] private[sql](
// Attach the dataset id and column position to the column reference, so that we can detect
// ambiguous self-join correctly. See the rule `DetectAmbiguousSelfJoin`.
// This must be called before we return a `Column` that contains `AttributeReference`.
// Note that, the metadata added here are only avaiable in the analyzer, as the analyzer rule
// Note that, the metadata added here are only available in the analyzer, as the analyzer rule
// `DetectAmbiguousSelfJoin` will remove it.
private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
val newExpr = expr transform {
Expand Down Expand Up @@ -1665,10 +1665,10 @@ class Dataset[T] private[sql](
* See [[RelationalGroupedDataset]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
* // Compute the average for all numeric columns rolled up by department and group.
* ds.rollup($"department", $"group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
* // Compute the max age and average salary, rolled up by department and gender.
* ds.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
Expand Down Expand Up @@ -1794,10 +1794,10 @@ class Dataset[T] private[sql](
* (i.e. cannot construct expressions).
*
* {{{
* // Compute the average for all numeric columns rolluped by department and group.
* // Compute the average for all numeric columns rolled up by department and group.
* ds.rollup("department", "group").avg()
*
* // Compute the max age and average salary, rolluped by department and gender.
* // Compute the max age and average salary, rolled up by department and gender.
* ds.rollup($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ case class FileSourceScanExec(
//
// Sort ordering would be over the prefix subset of `sort columns` being read
// from the table.
// eg.
// e.g.
// Assume (col0, col2, col3) are the columns read from the table
// If sort columns are (col0, col1), then sort ordering would be considered as (col0)
// If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
Expand Down Expand Up @@ -379,12 +379,12 @@ case class FileSourceScanExec(
case (key, _) if (key.equals("Location")) =>
val location = relation.location
val numPaths = location.rootPaths.length
val abbreviatedLoaction = if (numPaths <= 1) {
val abbreviatedLocation = if (numPaths <= 1) {
location.rootPaths.mkString("[", ", ", "]")
} else {
"[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
}
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLoaction)}"
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}"
case (key, value) => s"$key: ${redact(value)}"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS
object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the operaror
* 1. Computes the operator id for current operator and records it in the operator
* by setting a tag.
* 2. Computes the whole stage codegen id for current operator and records it in the
* operator by setting a tag.
* 3. Generate the two part explain output for this plan.
* 1. First part explains the operator tree with each operator tagged with an unique
* identifier.
* 2. Second part explans each operator in a verbose manner.
* 2. Second part explains each operator in a verbose manner.
*
* Note : This function skips over subqueries. They are handled by its caller.
*
Expand Down Expand Up @@ -117,7 +117,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
}

/**
* Traverses the supplied input plan in a bottem-up fashion does the following :
* Traverses the supplied input plan in a bottom-up fashion does the following :
* 1. produces a map : operator identifier -> operator
* 2. Records the operator id via setting a tag in the operator.
* Note :
Expand Down Expand Up @@ -210,7 +210,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {

/**
* Given a input plan, returns an array of tuples comprising of :
* 1. Hosting opeator id.
* 1. Hosting operator id.
* 2. Hosting expression
* 3. Subquery plan
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
def isEmpty: Boolean = numRows == 0

/**
* Clears up resources (eg. memory) held by the backing storage
* Clears up resources (e.g. memory) held by the backing storage
*/
def clear(): Unit = {
if (spillableArray != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,25 +386,25 @@ class SparkSqlAstBuilder extends AstBuilder {
* - '/path/to/fileOrJar'
*/
override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) {
val mayebePaths = if (ctx.STRING != null) string(ctx.STRING) else remainder(ctx.identifier).trim
val maybePaths = if (ctx.STRING != null) string(ctx.STRING) else remainder(ctx.identifier).trim
ctx.op.getType match {
case SqlBaseParser.ADD =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "file" => AddFileCommand(mayebePaths)
case "jar" => AddJarCommand(mayebePaths)
case "file" => AddFileCommand(maybePaths)
case "jar" => AddJarCommand(maybePaths)
case other => operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "files" | "file" =>
if (mayebePaths.length > 0) {
ListFilesCommand(mayebePaths.split("\\s+"))
if (maybePaths.length > 0) {
ListFilesCommand(maybePaths.split("\\s+"))
} else {
ListFilesCommand()
}
case "jars" | "jar" =>
if (mayebePaths.length > 0) {
ListJarsCommand(mayebePaths.split("\\s+"))
if (maybePaths.length > 0) {
ListJarsCommand(maybePaths.split("\\s+"))
} else {
ListJarsCommand()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
}

${ctx.registerComment(
s"""Codegend pipeline for stage (id=$codegenStageId)
s"""Codegened pipeline for stage (id=$codegenStageId)
|${this.treeString.trim}""".stripMargin,
"wsc_codegenPipeline")}
${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", true)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ trait AdaptiveSparkPlanHelper {

/**
* Returns a sequence containing the subqueries in this plan, also including the (nested)
* subquries in its children
* subqueries in its children
*/
def subqueriesAll(p: SparkPlan): Seq[SparkPlan] = {
val subqueries = flatMap(p)(_.subqueries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources._
* @param storage storage format used to describe how the query result is stored.
* @param provider the data source type to be used
* @param query the logical plan representing data to write to
* @param overwrite whthere overwrites existing directory
* @param overwrite whether overwrites existing directory
*/
case class InsertIntoDataSourceDirCommand(
storage: CatalogStorageFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ case class CreateDatabaseCommand(
* A command for users to remove a database from the system.
*
* 'ifExists':
* - true, if database_name does't exist, no action
* - false (default), if database_name does't exist, a warning message will be issued
* - true, if database_name doesn't exist, no action
* - false (default), if database_name doesn't exist, a warning message will be issued
* 'cascade':
* - true, the dependent objects are automatically dropped before dropping database.
* - false (default), it is in the Restrict mode. The database cannot be dropped if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ case class LoadDataCommand(
// entire string will be considered while making a Path instance,this is mainly done
// by considering the wild card scenario in mind.as per old logic query param is
// been considered while creating URI instance and if path contains wild card char '?'
// the remaining charecters after '?' will be removed while forming URI instance
// the remaining characters after '?' will be removed while forming URI instance
LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ case class DataSource(
s"Unable to infer schema for $format. It must be specified manually.")
}

// We just print a waring message if the data schema and partition schema have the duplicate
// We just print a warning message if the data schema and partition schema have the duplicate
// columns. This is because we allow users to do so in the previous Spark releases and
// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).
// See SPARK-18108 and SPARK-21144 for related discussions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class DynamicPartitionDataWriter(

private var fileCounter: Int = _
private var recordsInFile: Long = _
private var currentPartionValues: Option[UnsafeRow] = None
private var currentPartitionValues: Option[UnsafeRow] = None
private var currentBucketId: Option[Int] = None

/** Extracts the partition values out of an input row. */
Expand Down Expand Up @@ -247,19 +247,19 @@ class DynamicPartitionDataWriter(
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None

if (currentPartionValues != nextPartitionValues || currentBucketId != nextBucketId) {
if (currentPartitionValues != nextPartitionValues || currentBucketId != nextBucketId) {
// See a new partition or bucket - write to a new partition dir (or a new bucket file).
if (isPartitioned && currentPartionValues != nextPartitionValues) {
currentPartionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartionValues.get))
if (isPartitioned && currentPartitionValues != nextPartitionValues) {
currentPartitionValues = Some(nextPartitionValues.get.copy())
statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
}
if (isBucketed) {
currentBucketId = nextBucketId
statsTrackers.foreach(_.newBucket(currentBucketId.get))
}

fileCounter = 0
newOutputWriter(currentPartionValues, currentBucketId)
newOutputWriter(currentPartitionValues, currentBucketId)
} else if (description.maxRecordsPerFile > 0 &&
recordsInFile >= description.maxRecordsPerFile) {
// Exceeded the threshold in terms of the number of records per file.
Expand All @@ -268,7 +268,7 @@ class DynamicPartitionDataWriter(
assert(fileCounter < MAX_FILE_COUNTER,
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER")

newOutputWriter(currentPartionValues, currentBucketId)
newOutputWriter(currentPartitionValues, currentBucketId)
}
val outputRow = getOutputRow(record)
currentWriter.write(outputRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object FileFormatWriter extends Logging {

SQLExecution.checkSQLExecutionId(sparkSession)

// propagate the decription UUID into the jobs, so that committers
// propagate the description UUID into the jobs, so that committers
// get an ID guaranteed to be unique.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", description.uuid)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ object PartitioningUtils {
val decimalTry = Try {
// `BigDecimal` conversion can fail when the `field` is not a form of number.
val bigDecimal = new JBigDecimal(raw)
// It reduces the cases for decimals by disallowing values having scale (eg. `1.1`).
// It reduces the cases for decimals by disallowing values having scale (e.g. `1.1`).
require(bigDecimal.scale <= 0)
// `DecimalType` conversion can fail when
// 1. The precision is bigger than 38.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ case class ReplaceTableAsSelectExec(
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This implementation is atomic. The table replacement is staged, and the commit
* operation at the end should perform tne replacement of the table's metadata and contents. If the
* operation at the end should perform the replacement of the table's metadata and contents. If the
* write fails, the table is instructed to roll back staged changes and any previously written table
* is left untouched.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,9 @@ private[joins] class UnsafeHashedRelation(
readBuffer(valuesBuffer, 0, valuesSize)

val loc = binaryMap.lookup(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize)
val putSuceeded = loc.append(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize,
val putSucceeded = loc.append(keyBuffer, Platform.BYTE_ARRAY_OFFSET, keySize,
valuesBuffer, Platform.BYTE_ARRAY_OFFSET, valuesSize)
if (!putSuceeded) {
if (!putSucceeded) {
binaryMap.free()
throw new IOException("Could not allocate memory to grow BytesToBytesMap")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] {
case p: PythonUDF =>
// This is just a sanity check, the rule PullOutNondeterministic should
// already pull out those nondeterministic expressions.
assert(p.udfDeterministic, "Non-determinstic PythonUDFs should not appear " +
assert(p.udfDeterministic, "Non-deterministic PythonUDFs should not appear " +
"in grouping expression")
val canonicalized = p.canonicalized.asInstanceOf[PythonUDF]
if (attributeMap.contains(canonicalized)) {
Expand Down Expand Up @@ -174,7 +174,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
}

private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
// If fisrt UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
// If first UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
// otherwise check if subsequent UDFs are of the same type as the first UDF. (since we can only
// extract UDFs of the same eval type)

Expand Down Expand Up @@ -268,7 +268,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
case PythonEvalType.SQL_SCALAR_PANDAS_UDF | PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF =>
ArrowEvalPython(validUdfs, resultAttrs, child, evalType)
case _ =>
throw new AnalysisException("Unexcepted UDF evalType")
throw new AnalysisException("Unexpected UDF evalType")
}

attributeMap ++= validUdfs.map(canonicalizeDeterministic).zip(resultAttrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

/**
* Delete expired log entries that proceed the currentBatchId and retain
* sufficient minimum number of batches (given by minBatchsToRetain). This
* sufficient minimum number of batches (given by minBatchesToRetain). This
* equates to retaining the earliest compaction log that proceeds
* batch id position currentBatchId + 1 - minBatchesToRetain. All log entries
* prior to the earliest compaction log proceeding that position will be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,6 @@ object StreamExecution {

/**
* A special thread to run the stream query. Some codes require to run in the QueryExecutionThread
* and will use `classOf[QueryxecutionThread]` to check.
* and will use `classOf[QueryExecutionThread]` to check.
*/
abstract class QueryExecutionThread(name: String) extends UninterruptibleThread(name)
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object FlatMapGroupsWithStateExecHelper {
// =========================== Private implementations of StateManager ===========================
// ===============================================================================================

/** Commmon methods for StateManager implementations */
/** Common methods for StateManager implementations */
private abstract class StateManagerImplBase(shouldStoreTimestamp: Boolean)
extends StateManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object HiveSerDe {
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))

// `HiveSerDe` in `serdeMap` should be dintinct.
// `HiveSerDe` in `serdeMap` should be distinct.
val serdeInverseMap: Map[HiveSerDe, String] = serdeMap.flatMap {
case ("sequencefile", _) => None
case ("rcfile", _) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
val sink = new MemorySink()
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes))
val recoverFromChkpoint = outputMode == OutputMode.Complete()
val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromChkpoint)
val recoverFromCheckpoint = outputMode == OutputMode.Complete()
val query = startQuery(sink, extraOptions, recoverFromCheckpoint = recoverFromCheckpoint)
resultDf.createOrReplaceTempView(query.name)
query
} else if (source == SOURCE_NAME_FOREACH) {
Expand Down
Loading