Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,15 @@ private[spark] object Utils extends Logging {
}
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val current = f.listFiles
current ++ current.filter(_.isDirectory).flatMap(recursiveList)
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down Expand Up @@ -2659,6 +2668,13 @@ private[spark] object Utils extends Logging {
redact(redactionPattern, kvs.toArray)
}

def average[T](ts: Iterable[T])(implicit num: Numeric[T]): Double = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we require array type parameter? We are calling size twice in this method, which can be very slow if the input is not indexed seq.

if (ts.size > 0) {
num.toDouble(ts.sum) / ts.size
} else {
0
}
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(): Seq[String] = executedPlan match {
case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
case ExecutedCommandExec(desc: DescribeTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
desc.run(sparkSession).map {
Expand All @@ -128,7 +128,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
.mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if !s.isExtended =>
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r, r.children.map(planLater)) :: Nil
case f: FileWritingCommand => FileWritingCommandExec(f, f.children.map(planLater)) :: Nil

case r: RunnableCommand => ExecutedCommandExec(r) :: Nil

case MemoryPlan(sink, output) =>
val encoder = RowEncoder(sink.schema)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.Utils

/**
* A logical command specialized for writing data out. `FileWritingCommand`s are
* wrapped in `FileWritingCommandExec` during execution.
*/
trait FileWritingCommand extends logical.Command {

// The caller of `FileWritingCommand` can replace the metrics location by providing this external
// metrics structure.
private var _externalMetrics: Option[Map[String, SQLMetric]] = None
private[sql] def withExternalMetrics(map: Map[String, SQLMetric]): this.type = {
_externalMetrics = Option(map)
this
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we need this and the comment is not helpful.
Looks to me like this is currently either null/None, or the map that's defined below, in def metrics.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've explained the purpose of this in below comments.

Once a command invokes another command to write data. We have to update the metrics in the physical plan of the first command, not the second.

Before we invoke the second command, we have to replace the metrics in the physical plan's metrics with the metrics in the first one (i.e., the external metrics).

It seems to me that you miss the relationship that there is two different commands.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I missed it at first, but then I got it. See below: #18159 (comment)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have better code comment on this part too. I'll add it in next commit.


/**
* Those metrics will be updated once the command finishes writing data out. Those metrics will
* be taken by `FileWritingCommandExec` as its metrics when showing in UI.
*/
def metrics(sparkContext: SparkContext): Map[String, SQLMetric] = _externalMetrics.getOrElse {
Map(
// General metrics.
"avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"),
"numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
)
}

/**
* Callback function that update metrics collected from the writing operation.
*/
private[sql] def postDriverMetrics(sparkContext: SparkContext, metrics: Map[String, SQLMetric])
(writeSummaries: Seq[ExecutedWriteSummary]): Unit = {
var numPartitions = 0
var numFiles = 0
var totalNumBytes: Long = 0L
var totalNumOutput: Long = 0L

writeSummaries.foreach { summary =>
numPartitions += summary.updatedPartitions.size
numFiles += summary.numOutputFile
totalNumBytes += summary.numOutputBytes
totalNumOutput += summary.numOutputRows
}

// The time for writing individual file can be zero if it's less than 1 ms. Zero values can
// lower actual time of writing when calculating average, so excluding them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems arbitrary to exclude 0 values. 1ms values also bring the average down, yet you're keeping those.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excluding 0 values because 0 values can make the average writing time as 0. It seems to me that it doesn't really make sense to show 0 writing time. Seems it indicates the writing doesn't cost time.

1ms brings the average down, but you still get a meaningful time value.

val writingTime =
Utils.average(writeSummaries.flatMap(_.writingTimePerFile.filter(_ > 0))).toLong

val metricsNames = metrics.keys.toSeq.sorted

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you guarantee the metrics contains avg, numFiles, etc. as it's created by givenMetrics.getOrElse?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

givenMetrics comes from other FileWritingCommandExec or an empty. When it's an empty map, means the wrapped command won't call this callback. But I agree this is loose guarantee. I'll update this in next commit.

val metricsValues = Seq(writingTime, numFiles, totalNumBytes, totalNumOutput, numPartitions)
metricsNames.zip(metricsValues).foreach(x => metrics(x._1).add(x._2))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make sense if _externalMetrics are provided, as there's no guarantee that the user-provided map is exactly as you expect. It just happens to work when it's an empty map, but it's very fragile, not to mention the fact that it relies on the alphabetical ordering of the keys.. Please rework this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I correctly understand your point.

The external metrics are always coming from the same kind of trait. It's not coming from user, how it's user-provided map?

The order of keys returned by map.keys is not determined as I tried. Without sorting, how do we make sure we match the metrics values with correct metrics keys?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant user as in caller of this function. This function only works when the input satisfies some requirements and silently fails otherwise. I get it that it's a private method, but it's very error prone.

Imagine I want to extend this by adding another metric called avgNumFilesPerPart. If I add it to the end of the metricsValues Seq, then all metrics will get messed up (because of the ordering).

At the very least, leave a comment saying that the metricValues need to be sorted alphabetically (and rename writingTime to avgWritingTime). But you should rather consider using a WriteMetrics class instead of a Map.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For metrics purpose, this change is a bit too large. I'd try not to increase the complexity for now. I added a comment for this.


val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metricsNames.map(metrics(_)))
}

def run(
sparkSession: SparkSession,
children: Seq[SparkPlan],
metrics: Map[String, SQLMetric],
metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this take metrics as a parameter, when the trait already has a metrics member?

A big part of this patch is just about passing down metrics through this interface.. It also makes it quite hard to follow. Is there no way we can avoid this? It would be a significant improvement if you could find a solution.

Basically try to specify the metrics (either the Map you have, or None) together with the corresponding callback function just once, when you first instantiate FileWritingCommandExec in SparkStrategies.scala. Then you won't need to pass around metrics all over the place and you also won't need the transform ... withExternalMetrics hack.

@viirya viirya Jun 6, 2017

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case is more complicated...

We have those commands which don't write the data but invoke another commands to do that. The execution data for showing on UI is bound to the original commands, but the invoked commands. The invoked commands will update the metrics in its physical plans, not the caller's. If we don't pass the metrics or callback function, you won't actually update the correct metrics. That's why we need to pass metrics or callback function here...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that, but I think it can still be simplified. See below: #18159 (comment)

}

/**
* A physical operator specialized to execute the run method of a `FileWritingCommand`,
* save the result to prevent multiple executions, and record necessary metrics for UI.
*/
case class FileWritingCommandExec(
cmd: FileWritingCommand,
children: Seq[SparkPlan]) extends CommandExec {

override lazy val metrics = cmd.metrics(sqlContext.sparkContext)

protected[sql] lazy val invokeCommand: Seq[Row] =
cmd.run(sqlContext.sparkSession, children, metrics,
cmd.postDriverMetrics(sqlContext.sparkContext, metrics))
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,16 @@ import org.apache.spark.sql.types._
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
throw new NotImplementedError
}

def run(sparkSession: SparkSession): Seq[Row] = {
throw new NotImplementedError
}
def run(sparkSession: SparkSession): Seq[Row]
}

/**
* A physical operator that executes the run method of a `RunnableCommand` and
* A physical operator that executes the run method of a `logical.Command` and
* saves the result to prevent multiple executions.
*/
case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan {
trait CommandExec extends SparkPlan {
val cmd: logical.Command

/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
Expand All @@ -60,14 +56,11 @@ case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) e
*/
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
val rows = if (children.isEmpty) {
cmd.run(sqlContext.sparkSession)
} else {
cmd.run(sqlContext.sparkSession, children)
}
rows.map(converter(_).asInstanceOf[InternalRow])
invokeCommand.map(converter(_).asInstanceOf[InternalRow])
}

protected[sql] val invokeCommand: Seq[Row]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name looks weird for a val, shall we make it a def?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Sounds good.


override def innerChildren: Seq[QueryPlan[_]] = cmd.innerChildren

override def output: Seq[Attribute] = cmd.output
Expand All @@ -85,6 +78,15 @@ case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) e
}
}

/**
* A physical operator specialized to execute the run method of a `RunnableCommand` and
* save the result to prevent multiple executions.
*/
case class ExecutedCommandExec(cmd: RunnableCommand) extends CommandExec {
protected[sql] lazy val invokeCommand: Seq[Row] = cmd.run(sqlContext.sparkSession)
override def children: Seq[SparkPlan] = Nil
}

/**
* An explain command for users to see how a command will be executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.sources.BaseRelation

/**
Expand Down Expand Up @@ -120,11 +123,31 @@ case class CreateDataSourceTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan)
extends RunnableCommand {
extends FileWritingCommand {

/**
* The code path this command writes data out depends on the type of data source:
*
* For FileFormat-based data sources, `InsertIntoHadoopFsRelationCommand` is invoked and we
* can update metrics.
*
* For other data sources, `CreatableRelationProvider.createRelation` will be called. We can't
* record metrics for that. So we will return empty metrics map.
*/
override def metrics(sparkContext: SparkContext): Map[String, SQLMetric] =
if (classOf[FileFormat].isAssignableFrom(DataSource.lookupDataSource(table.provider.get))) {
super.metrics(sparkContext)
} else {
Map.empty
}

override def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
override def run(
sparkSession: SparkSession,
children: Seq[SparkPlan],
metrics: Map[String, SQLMetric],
metricsCallback: (Seq[ExecutedWriteSummary]) => Unit): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)

Expand All @@ -146,7 +169,8 @@ case class CreateDataSourceTableAsSelectCommand(
}

saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true,
metrics = metrics)
} else {
assert(table.schema.isEmpty)

Expand All @@ -156,7 +180,8 @@ case class CreateDataSourceTableAsSelectCommand(
table.storage.locationUri
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false,
metrics = metrics)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
Expand All @@ -183,7 +208,8 @@ case class CreateDataSourceTableAsSelectCommand(
tableLocation: Option[URI],
data: LogicalPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
tableExists: Boolean,
metrics: Map[String, SQLMetric]): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_))
val dataSource = DataSource(
Expand All @@ -195,7 +221,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)

try {
dataSource.writeAndRead(mode, query)
dataSource.writeAndRead(mode, query, Some(metrics))
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.FileWritingCommandExec
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -436,7 +438,10 @@ case class DataSource(
* Writes the given [[LogicalPlan]] out to this [[DataSource]] and returns a [[BaseRelation]] for
* the following reading.
*/
def writeAndRead(mode: SaveMode, data: LogicalPlan): BaseRelation = {
def writeAndRead(
mode: SaveMode,
data: LogicalPlan,
externalMetrics: Option[Map[String, SQLMetric]] = None): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
Expand All @@ -446,7 +451,12 @@ case class DataSource(
dataSource.createRelation(
sparkSession.sqlContext, mode, caseInsensitiveOptions, Dataset.ofRows(sparkSession, data))
case format: FileFormat =>
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data)).toRdd
val qe = sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, data))
qe.executedPlan.transform {
case f: FileWritingCommandExec =>
val newCmd = f.cmd.withExternalMetrics(externalMetrics.getOrElse(null))
FileWritingCommandExec(newCmd, f.children)
}.execute()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is much of an improvement over the previous hack.. See above comments, try to avoid changing this code at all.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide specified suggestion regarding this? I don't think there is a possible way to avoid this.

Those commands are invoking another commands to write the data out. We want to have the metrics updated is the metrics in the original physical node, not the invoked one. To do this, we either pass the callback or the metrics in the original plan into the invoked one. Do you think we have other choice?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm..

So it looks like you do this in 2 places: CreateHiveTableAsSelectCommand and CreateDataSourceTableAsSelectCommand.saveDataIntoTable() -> DataSource.writeAndRead()

For the latter case, I'd pass CreateDataSourceTableAsSelectCommand.metrics through planForWritingFileFormat() to InsertIntoHadoopFsRelationCommand which already extends FileWritingCommand and could just take those existing metrics as a constructor argument.

For the former, I'm not sure, as I see InsertIntoTable does not extend FileWritingCommand. Maybe it should? Then a similar solution would apply there.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InsertIntoTable is a general logical node representing inserting some data into a table. It is not a logical.Command. It shouldn't be a FileWritingCommand actually. As it's unresolved, making it to extend FileWritingCommand and have metrics don't make sense too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that's not a good way to go..

// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
case _ =>
Expand Down
Loading