Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ private[spark] class Executor(
}
}

setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val metrics = TaskMetrics.empty
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
Expand Down Expand Up @@ -405,7 +404,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}

val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info))
taskData.updateTaskInfo(info)
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
Expand Down
54 changes: 28 additions & 26 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
class TaskUIData private(
private var _taskInfo: TaskInfo,
private var _metrics: Option[TaskMetricsUIData]) {
class TaskUIData private(private var _taskInfo: TaskInfo) {

private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY)

var errorMessage: Option[String] = None

Expand All @@ -127,7 +127,7 @@ private[spark] object UIData {
}

def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
_metrics = TaskUIData.toTaskMetricsUIData(metrics)
_metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
}

def taskDuration: Option[Long] = {
Expand All @@ -140,28 +140,8 @@ private[spark] object UIData {
}

object TaskUIData {
def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
}

private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
metrics.map { m =>
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
def apply(taskInfo: TaskInfo): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
}

/**
Expand Down Expand Up @@ -206,6 +186,28 @@ private[spark] object UIData {
shuffleReadMetrics: ShuffleReadMetricsUIData,
shuffleWriteMetrics: ShuffleWriteMetricsUIData)

object TaskMetricsUIData {
def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}

val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
}

case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
object InputMetricsUIData {
def apply(metrics: InputMetrics): InputMetricsUIData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AllStagesResourceSuite extends SparkFunSuite {
val tasks = new LinkedHashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
tasks(idx.toLong) = TaskUIData(
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false))
}

val stageUiData = new StageUIData()
Expand Down
2 changes: 1 addition & 1 deletion docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
To use the Jupyter notebook (previously known as the IPython notebook),

{% highlight bash %}
$ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
{% endhighlight %}

You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ identifierComment

relationPrimary
: tableIdentifier sample? (AS? strictIdentifier)? #tableName
| '(' queryNoWith ')' sample? (AS? strictIdentifier)? #aliasedQuery
| '(' queryNoWith ')' sample? (AS? strictIdentifier) #aliasedQuery
| '(' relation ')' sample? (AS? strictIdentifier)? #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ class Analyzer(
// Skip sort with aggregate. This will be handled in ResolveAggregateFunctions
case sa @ Sort(_, _, child: Aggregate) => sa

case s @ Sort(order, _, child) if child.resolved =>
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
try {
val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
Expand All @@ -1066,7 +1066,7 @@ class Analyzer(
case ae: AnalysisException => s
}

case f @ Filter(cond, child) if child.resolved =>
case f @ Filter(cond, child) if !f.resolved && child.resolved =>
try {
val newCond = resolveExpressionRecursively(cond, child)
val requiredAttrs = newCond.references.filter(_.resolved)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* hooks.
*/
override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) {
plan(ctx.queryNoWith)
.optionalMap(ctx.sample)(withSample)
.optionalMap(ctx.strictIdentifier)(aliasPlan)
aliasPlan(ctx.strictIdentifier,
plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,12 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
}

/**
* Aliased subquery.
*
* @param alias the alias name for this subquery.
* @param child the logical plan of this subquery.
*/
case class SubqueryAlias(
alias: String,
child: LogicalPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,17 @@ class PlanParserSuite extends PlanTest {
| (select id from t0)) as u_1
""".stripMargin,
plan.union(plan).union(plan).as("u_1").select('id))

}

test("aliased subquery") {
assertEqual("select a from (select id as a from t0) tt",
table("t0").select('id.as("a")).as("tt").select('a))
intercept("select a from (select id as a from t0)", "mismatched input")

assertEqual("from (select id as a from t0) tt select a",
table("t0").select('id.as("a")).as("tt").select('a))
intercept("from (select id as a from t0) select a", "extraneous input 'a'")
}

test("scalar sub-query") {
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a), AVG(a), VARIANCE(a), STDDEV(a),
FROM testData;

-- Aggregate with foldable input and multiple distinct groups.
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a;
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) t GROUP BY a;

-- Aliases in SELECT could be used in GROUP BY
SELECT a AS k, COUNT(b) FROM testData GROUP BY k;
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/test/resources/sql-tests/inputs/limit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SELECT * FROM testdata LIMIT true;
SELECT * FROM testdata LIMIT 'a';

-- limit within a subquery
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) WHERE id > 3;
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) t WHERE id > 3;

-- limit ALL
SELECT * FROM testdata WHERE key < 3 LIMIT ALL;
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ select 'a' || 'b' || 'c';

-- Check if catalyst combine nested `Concat`s
EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10));
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) t;
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ FROM (SELECT *
FROM t1)) t4
WHERE t4.t2b IN (SELECT Min(t3b)
FROM t3
WHERE t4.t2a = t3a));
WHERE t4.t2a = t3a)) T;

-- UNION, UNION ALL, UNION DISTINCT, INTERSECT and EXCEPT for NOT IN
-- TC 01.12
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
) T
)
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ AND c.cv = (SELECT max(avg)
FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv));
GROUP BY c1.cv) T);

create temporary view t1 as select * from values
('val1a', 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 00:00:00.000', date '2014-04-04'),
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/resources/sql-tests/inputs/union.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ CREATE OR REPLACE TEMPORARY VIEW t2 AS VALUES (1.0, 1), (2.0, 4) tbl(c1, c2);
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t1);
SELECT * FROM t1) T;

-- Type Coerced Union
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t2);
SELECT * FROM t2) T;

-- Regression test for SPARK-18622
SELECT a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct<skewness(CAST(a AS DOUBLE)):double,kurtosis(CAST(a AS DOUBLE)):double,min


-- !query 14
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) t GROUP BY a
-- !query 14 schema
struct<count(DISTINCT b):bigint,count(DISTINCT b, c):bigint>
-- !query 14 output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ The limit expression must be integer type, but got string;


-- !query 10
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) WHERE id > 3
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) t WHERE id > 3
-- !query 10 schema
struct<id:bigint>
-- !query 10 output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,22 @@ abc

-- !query 3
EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10))
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) t
-- !query 3 schema
struct<plan:string>
-- !query 3 output
== Parsed Logical Plan ==
'Project [concat(concat(concat('col1, 'col2), 'col3), 'col4) AS col#x]
+- 'Project ['id AS col1#x, 'id AS col2#x, 'id AS col3#x, 'id AS col4#x]
+- 'UnresolvedTableValuedFunction range, [10]
+- 'SubqueryAlias t
+- 'Project ['id AS col1#x, 'id AS col2#x, 'id AS col3#x, 'id AS col4#x]
+- 'UnresolvedTableValuedFunction range, [10]

== Analyzed Logical Plan ==
col: string
Project [concat(concat(concat(cast(col1#xL as string), cast(col2#xL as string)), cast(col3#xL as string)), cast(col4#xL as string)) AS col#x]
+- Project [id#xL AS col1#xL, id#xL AS col2#xL, id#xL AS col3#xL, id#xL AS col4#xL]
+- Range (0, 10, step=1, splits=None)
+- SubqueryAlias t
+- Project [id#xL AS col1#xL, id#xL AS col2#xL, id#xL AS col3#xL, id#xL AS col4#xL]
+- Range (0, 10, step=1, splits=None)

== Optimized Logical Plan ==
Project [concat(cast(id#xL as string), cast(id#xL as string), cast(id#xL as string), cast(id#xL as string)) AS col#x]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ FROM (SELECT *
FROM t1)) t4
WHERE t4.t2b IN (SELECT Min(t3b)
FROM t3
WHERE t4.t2a = t3a))
WHERE t4.t2a = t3a)) T
-- !query 13 schema
struct<t2a:string,t2b:smallint,t2c:int,t2d:bigint,t2e:float,t2f:double,t2g:decimal(2,-2),t2h:timestamp,t2i:date>
-- !query 13 output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
)
) T
)
-- !query 3 schema
struct<>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ AND c.cv = (SELECT max(avg)
FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv))
GROUP BY c1.cv) T)
-- !query 3 schema
struct<pk:int,cv:int>
-- !query 3 output
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/resources/sql-tests/results/union.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct<>
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t1)
SELECT * FROM t1) T
-- !query 2 schema
struct<c1:int,c2:string>
-- !query 2 output
Expand All @@ -38,7 +38,7 @@ FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t2)
SELECT * FROM t2) T
-- !query 3 schema
struct<c1:decimal(11,1),c2:string>
-- !query 3 output
Expand Down
Loading