Skip to content
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,15 @@ private[spark] object Utils extends Logging {
}
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@ package org.apache.spark.sql.execution.command

import java.util.UUID

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {

// The map used to record the metrics of running the command. This will be passed to
// `ExecutedCommand` during query planning.
lazy val metrics: Map[String, SQLMetric] = Map.empty

def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
throw new NotImplementedError
}
Expand All @@ -46,11 +55,74 @@ trait RunnableCommand extends logical.Command {
}
}

/**
* A special `RunnableCommand` which writes data out and updates metrics.
*/
trait DataWritingCommand extends RunnableCommand {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move it to a new file

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sure.


override lazy val metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
"avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
"numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
)
}

/**
* Callback function that update metrics collected from the writing operation.
*/
protected def updateWritingMetrics(writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
val sparkContext = SparkContext.getActive.get
var numPartitions = 0
var numFiles = 0
var totalNumBytes: Long = 0L
var totalNumOutput: Long = 0L
var totalWritingTime: Long = 0L
var numFilesNonZeroWritingTime = 0

writeSummaries.foreach { summary =>
numPartitions += summary.updatedPartitions.size
numFiles += summary.numOutputFile
totalNumBytes += summary.numOutputBytes
totalNumOutput += summary.numOutputRows
totalWritingTime += summary.totalWritingTime
numFilesNonZeroWritingTime += summary.numFilesWithNonZeroWritingTime
}

// We only count non-zero writing time when averaging total writing time.
// The time for writing individual file can be zero if it's less than 1 ms. Zero values can

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only happens if a partition is very small, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be rare?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's for small partition like the test case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Since it should be rare, I removed this part so the codes can be simpler.

// lower actual time of writing to zero when calculating average, so excluding them.
val avgWritingTime = if (numFilesNonZeroWritingTime > 0) {
(totalWritingTime / numFilesNonZeroWritingTime).toLong
} else {
0L
}

metrics("avgTime").add(avgWritingTime)
metrics("numFiles").add(numFiles)
metrics("numOutputBytes").add(totalNumBytes)
metrics("numOutputRows").add(totalNumOutput)
metrics("numParts").add(numPartitions)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
}
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*
* @param cmd the `RunnableCommand` this operator will run.
* @param children the children physical plans ran by the `RunnableCommand`.
*/
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {

override lazy val metrics: Map[String, SQLMetric] = cmd.metrics

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand Down
Loading