Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar
metrics-json/4.1.1//metrics-json-4.1.1.jar
metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar
minlog/1.3.0//minlog-1.3.0.jar
mssql-jdbc/6.2.1.jre7//mssql-jdbc-6.2.1.jre7.jar
netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar
nimbus-jose-jwt/4.41.1//nimbus-jose-jwt-4.41.1.jar
objenesis/2.5.1//objenesis-2.5.1.jar
Expand Down
Binary file added docs/img/webui-structured-streaming-detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions docs/web-ui.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,34 @@ Here is the list of SQL metrics:

</table>

## Structured Streaming Tab
When running Structured Streaming jobs in micro-batch mode, a Structured Streaming tab will be
available on the Web UI. The overview page displays some brief statistics for running and completed
queries. Also, you can check the latest exception of a failed query. For detailed statistics, please
click a "run id" in the tables.

<p style="text-align: center;">
<img src="img/webui-structured-streaming-detail.png" title="Structured Streaming Query Statistics" alt="Structured Streaming Query Statistics">
</p>

The statistics page displays some useful metrics for insight into the status of your streaming
queries. Currently, it contains the following metrics.

* **Input Rate.** The aggregate (across all sources) rate of data arriving.
* **Process Rate.** The aggregate (across all sources) rate at which Spark is processing data.
* **Input Rows.** The aggregate (across all sources) number of records processed in a trigger.
* **Batch Duration.** The process duration of each batch.
* **Operation Duration.** The amount of time taken to perform various operations in milliseconds.
The tracked operations are listed as follows.
* addBatch: Adds result data of the current batch to the sink.
* getBatch: Gets a new batch of data to process.
* latestOffset: Gets the latest offsets for sources.
* queryPlanning: Generates the execution plan.
* walCommit: Writes the offsets to the metadata log.

As an early-release version, the statistics page is still under development and will be improved in
future releases.

## Streaming Tab
The web UI includes a Streaming tab if the application uses Spark streaming. This tab displays
scheduling delay and processing time for each micro-batch in the data stream, which can be useful
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,10 @@
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</exclusion>
<exclusion>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
12 changes: 2 additions & 10 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from tempfile import NamedTemporaryFile

from py4j.protocol import Py4JError
from py4j.java_gateway import is_instance_of

from pyspark import accumulators
from pyspark.accumulators import Accumulator
Expand Down Expand Up @@ -865,17 +864,10 @@ def union(self, rdds):
first_jrdd_deserializer = rdds[0]._jrdd_deserializer
if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds):
rdds = [x._reserialize() for x in rdds]
gw = SparkContext._gateway
cls = SparkContext._jvm.org.apache.spark.api.java.JavaRDD
is_jrdd = is_instance_of(gw, rdds[0]._jrdd, cls)
jrdds = gw.new_array(cls, len(rdds))
jrdds = SparkContext._gateway.new_array(cls, len(rdds))
for i in range(0, len(rdds)):
if is_jrdd:
jrdds[i] = rdds[i]._jrdd
else:
# zip could return JavaPairRDD hence we ensure `_jrdd`
# to be `JavaRDD` by wrapping it in a `map`
jrdds[i] = rdds[i].map(lambda x: x)._jrdd
jrdds[i] = rdds[i]._jrdd
return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer)

def broadcast(self, value):
Expand Down
9 changes: 0 additions & 9 deletions python/pyspark/tests/test_rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,6 @@ def test_zip_chaining(self):
set([(x, (x, x)) for x in 'abc'])
)

def test_union_pair_rdd(self):
# Regression test for SPARK-31788
rdd = self.sc.parallelize([1, 2])
pair_rdd = rdd.zip(rdd)
self.assertEqual(
self.sc.union([pair_rdd, pair_rdd]).collect(),
[((1, 1), (2, 2)), ((1, 1), (2, 2))]
)

def test_deleting_input_files(self):
# Regression test for SPARK-1025
tempFile = tempfile.NamedTemporaryFile(delete=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst

import java.sql.{Date, Timestamp}
import java.time.LocalDate
import java.time.{Instant, LocalDate}

import scala.language.implicitConversions

Expand Down Expand Up @@ -152,6 +152,7 @@ package object dsl {
implicit def bigDecimalToLiteral(d: java.math.BigDecimal): Literal = Literal(d)
implicit def decimalToLiteral(d: Decimal): Literal = Literal(d)
implicit def timestampToLiteral(t: Timestamp): Literal = Literal(t)
implicit def instantToLiteral(i: Instant): Literal = Literal(i)
implicit def binaryToLiteral(a: Array[Byte]): Literal = Literal(a)

implicit def symbolToUnresolvedAttribute(s: Symbol): analysis.UnresolvedAttribute =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FUNC_ALIAS, FunctionBuilder}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -311,7 +311,12 @@ case object NamePlaceholder extends LeafExpression with Unevaluable {
/**
* Returns a Row containing the evaluation of all children expressions.
*/
object CreateStruct extends FunctionBuilder {
object CreateStruct {
/**
* Returns a named struct with generated names or using the names when available.
* It should not be used for `struct` expressions or functions explicitly called
* by users.
*/
def apply(children: Seq[Expression]): CreateNamedStruct = {
CreateNamedStruct(children.zipWithIndex.flatMap {
case (e: NamedExpression, _) if e.resolved => Seq(Literal(e.name), e)
Expand All @@ -320,12 +325,23 @@ object CreateStruct extends FunctionBuilder {
})
}

/**
* Returns a named struct with a pretty SQL name. It will show the pretty SQL string
* in its output column name as if `struct(...)` was called. Should be
* used for `struct` expressions or functions explicitly called by users.
*/
def create(children: Seq[Expression]): CreateNamedStruct = {
val expr = CreateStruct(children)
expr.setTagValue(FUNC_ALIAS, "struct")
expr
}

/**
* Entry to use in the function registry.
*/
val registryEntry: (String, (ExpressionInfo, FunctionBuilder)) = {
val info: ExpressionInfo = new ExpressionInfo(
"org.apache.spark.sql.catalyst.expressions.NamedStruct",
classOf[CreateNamedStruct].getCanonicalName,
null,
"struct",
"_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.",
Expand All @@ -335,7 +351,7 @@ object CreateStruct extends FunctionBuilder {
"",
"",
"")
("struct", (info, this))
("struct", (info, this.create))
}
}

Expand Down Expand Up @@ -433,7 +449,15 @@ case class CreateNamedStruct(children: Seq[Expression]) extends Expression {
""".stripMargin, isNull = FalseLiteral)
}

override def prettyName: String = "named_struct"
// There is an alias set at `CreateStruct.create`. If there is an alias,
// this is the struct function explicitly called by a user and we should
// respect it in the SQL string as `struct(...)`.
override def prettyName: String = getTagValue(FUNC_ALIAS).getOrElse("named_struct")

override def sql: String = getTagValue(FUNC_ALIAS).map { alias =>
val childrenSQL = children.indices.filter(_ % 2 == 1).map(children(_).sql).mkString(", ")
s"$alias($childrenSQL)"
}.getOrElse(super.sql)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Create a [[CreateStruct]] expression.
*/
override def visitStruct(ctx: StructContext): Expression = withOrigin(ctx) {
CreateStruct(ctx.argument.asScala.map(expression))
CreateStruct.create(ctx.argument.asScala.map(expression))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID

import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
Expand Down Expand Up @@ -50,7 +51,7 @@ import org.apache.spark.util.Utils
class QueryExecution(
val sparkSession: SparkSession,
val logical: LogicalPlan,
val tracker: QueryPlanningTracker = new QueryPlanningTracker) {
val tracker: QueryPlanningTracker = new QueryPlanningTracker) extends Logging {

// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner
Expand Down Expand Up @@ -133,26 +134,42 @@ class QueryExecution(
tracker.measurePhase(phase)(block)
}

def simpleString: String = simpleString(false)

def simpleString(formatted: Boolean): String = withRedaction {
def simpleString: String = {
val concat = new PlanStringConcat()
concat.append("== Physical Plan ==\n")
simpleString(false, SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def simpleString(
formatted: Boolean,
maxFields: Int,
append: String => Unit): Unit = {
append("== Physical Plan ==\n")
if (formatted) {
try {
ExplainUtils.processPlan(executedPlan, concat.append)
ExplainUtils.processPlan(executedPlan, append)
} catch {
case e: AnalysisException => concat.append(e.toString)
case e: IllegalArgumentException => concat.append(e.toString)
case e: AnalysisException => append(e.toString)
case e: IllegalArgumentException => append(e.toString)
}
} else {
QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false)
QueryPlan.append(executedPlan,
append, verbose = false, addSuffix = false, maxFields = maxFields)
}
concat.append("\n")
concat.toString
append("\n")
}

def explainString(mode: ExplainMode): String = {
val concat = new PlanStringConcat()
explainString(mode, SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = {
val queryExecution = if (logical.isStreaming) {
// This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
// output mode does not matter since there is no `Sink`.
Expand All @@ -165,19 +182,19 @@ class QueryExecution(

mode match {
case SimpleMode =>
queryExecution.simpleString
queryExecution.simpleString(false, maxFields, append)
case ExtendedMode =>
queryExecution.toString
queryExecution.toString(maxFields, append)
case CodegenMode =>
try {
org.apache.spark.sql.execution.debug.codegenString(queryExecution.executedPlan)
org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)
} catch {
case e: AnalysisException => e.toString
case e: AnalysisException => append(e.toString)
}
case CostMode =>
queryExecution.stringWithStats
queryExecution.stringWithStats(maxFields, append)
case FormattedMode =>
queryExecution.simpleString(formatted = true)
queryExecution.simpleString(formatted = true, maxFields = maxFields, append)
}
}

Expand All @@ -204,27 +221,39 @@ class QueryExecution(

override def toString: String = withRedaction {
val concat = new PlanStringConcat()
writePlans(concat.append, SQLConf.get.maxToStringFields)
concat.toString
toString(SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def toString(maxFields: Int, append: String => Unit): Unit = {
writePlans(append, maxFields)
}

def stringWithStats: String = withRedaction {
def stringWithStats: String = {
val concat = new PlanStringConcat()
stringWithStats(SQLConf.get.maxToStringFields, concat.append)
withRedaction {
concat.toString
}
}

private def stringWithStats(maxFields: Int, append: String => Unit): Unit = {
val maxFields = SQLConf.get.maxToStringFields

// trigger to compute stats for logical plans
try {
optimizedPlan.stats
} catch {
case e: AnalysisException => concat.append(e.toString + "\n")
case e: AnalysisException => append(e.toString + "\n")
}
// only show optimized logical plan and physical plan
concat.append("== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, concat.append, verbose = true, addSuffix = true, maxFields)
concat.append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, concat.append, verbose = true, addSuffix = false, maxFields)
concat.append("\n")
concat.toString
append("== Optimized Logical Plan ==\n")
QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields)
append("\n== Physical Plan ==\n")
QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields)
append("\n")
}

/**
Expand Down Expand Up @@ -261,19 +290,26 @@ class QueryExecution(
/**
* Dumps debug information about query execution into the specified file.
*
* @param path path of the file the debug info is written to.
* @param maxFields maximum number of fields converted to string representation.
* @param explainMode the explain mode to be used to generate the string
* representation of the plan.
*/
def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = {
def toFile(
path: String,
maxFields: Int = Int.MaxValue,
explainMode: Option[String] = None): Unit = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))
val append = (s: String) => {
writer.write(s)
}
try {
writePlans(append, maxFields)
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode)
explainString(mode, maxFields, writer.write)
if (mode != CodegenMode) {
writer.write("\n== Whole Stage Codegen ==\n")
org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)
}
log.info(s"Debug information was written at: $filePath")
} finally {
writer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ object functions {
* @since 1.4.0
*/
@scala.annotation.varargs
def struct(cols: Column*): Column = withExpr { CreateStruct(cols.map(_.expr)) }
def struct(cols: Column*): Column = withExpr { CreateStruct.create(cols.map(_.expr)) }

/**
* Creates a new struct column that composes multiple input columns.
Expand Down
Loading