Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,15 @@ private[spark] object Utils extends Logging {
}
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down Expand Up @@ -2659,6 +2668,21 @@ private[spark] object Utils extends Logging {
redact(redactionPattern, kvs.toArray)
}

/**
* Computes the average of all elements in an `Iterable`. If there is no element, returns 0.
*/
def average[T](ts: Iterable[T])(implicit num: Numeric[T]): Double = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we require array type parameter? We are calling size twice in this method, which can be very slow if the input is not indexed seq.

if (ts.isEmpty) {
0.0
} else {
var count = 0
val sum = ts.reduce { (sum, ele) =>
count += 1
num.plus(sum, ele)
}
num.toDouble(sum) / (count + 1)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better? Now we don't call size on the Iterable.

}
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,65 @@ package org.apache.spark.sql.execution.command

import java.util.UUID

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {

// The map used to record the metrics of running the command. This will be passed to
// `ExecutedCommand` during query planning.
private[sql] lazy val metrics: Map[String, SQLMetric] = Map.empty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually we don't need private[sql] under the execution package.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. got it.


/**
* Callback function that update metrics collected from the writing operation.
*/
protected def callbackMetricsUpdater(writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more reasonable to do this in InsertIntoHadoopFsRelation. If you are worried about duplicated code, we can create a trait for InsertIntoHadoopFsRelation and InsertIntoHive

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I created a trait for them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about updateWritingMetrics?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good for me.

val sparkContext = SparkContext.getActive.get
var numPartitions = 0
var numFiles = 0
var totalNumBytes: Long = 0L
var totalNumOutput: Long = 0L

writeSummaries.foreach { summary =>
numPartitions += summary.updatedPartitions.size
numFiles += summary.numOutputFile
totalNumBytes += summary.numOutputBytes
totalNumOutput += summary.numOutputRows
}

// The time for writing individual file can be zero if it's less than 1 ms. Zero values can
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only happens if a partition is very small, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be rare?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's for small partition like the test case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Since it should be rare, I removed this part so the codes can be simpler.

// lower actual time of writing to zero when calculating average, so excluding them.
val avgWritingTime =
Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 0))).toLong

metrics("avgTime").add(avgWritingTime)
metrics("numFiles").add(numFiles)
metrics("numOutputBytes").add(totalNumBytes)
metrics("numOutputRows").add(totalNumOutput)
metrics("numParts").add(numPartitions)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
}

def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
throw new NotImplementedError
}
Expand All @@ -49,8 +90,14 @@ trait RunnableCommand extends logical.Command {
/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*
* @param cmd the `RunnableCommand` this operator will run.
* @param children the children physical plans ran by the `RunnableCommand`.
*/
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {

override lazy val metrics: Map[String, SQLMetric] = cmd.metrics

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand Down
Loading