Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,15 @@ private[spark] object Utils extends Logging {
}
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

/**
* A special `RunnableCommand` which writes data out and updates metrics.
*/
trait DataWritingCommand extends RunnableCommand {

override lazy val metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Map(
"avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
"numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
)
}

/**
* Callback function that update metrics collected from the writing operation.
*/
protected def updateWritingMetrics(writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
val sparkContext = SparkContext.getActive.get
var numPartitions = 0
var numFiles = 0
var totalNumBytes: Long = 0L
var totalNumOutput: Long = 0L
var totalWritingTime: Long = 0L

writeSummaries.foreach { summary =>
numPartitions += summary.updatedPartitions.size
numFiles += summary.numOutputFile
totalNumBytes += summary.numOutputBytes
totalNumOutput += summary.numOutputRows
totalWritingTime += summary.totalWritingTime
}

val avgWritingTime = if (numFiles > 0) {
(totalWritingTime / numFiles).toLong
} else {
0L
}

metrics("avgTime").add(avgWritingTime)
metrics("numFiles").add(numFiles)
metrics("numOutputBytes").add(totalNumBytes)
metrics("numOutputRows").add(totalNumOutput)
metrics("numParts").add(numPartitions)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
Expand All @@ -37,6 +38,11 @@ import org.apache.spark.sql.types._
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {

// The map used to record the metrics of running the command. This will be passed to
// `ExecutedCommand` during query planning.
lazy val metrics: Map[String, SQLMetric] = Map.empty

def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
throw new NotImplementedError
}
Expand All @@ -49,8 +55,14 @@ trait RunnableCommand extends logical.Command {
/**
* A physical operator that executes the run method of a `RunnableCommand` and
* saves the result to prevent multiple executions.
*
* @param cmd the `RunnableCommand` this operator will run.
* @param children the children physical plans ran by the `RunnableCommand`.
*/
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {

override lazy val metrics: Map[String, SQLMetric] = cmd.metrics

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand Down
Loading