Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ private[spark] object JsonProtocol {
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
("Accumulables" -> JArray(
stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
}

def taskInfoToJson(taskInfo: TaskInfo): JValue = {
Expand All @@ -281,7 +280,15 @@ private[spark] object JsonProtocol {
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
("Killed" -> taskInfo.killed) ~
("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson)))
("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
}

private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")

def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = {
JArray(accumulables
.filterNot(_.name.exists(accumulableBlacklist.contains))
.toList.map(accumulableInfoToJson))
}

def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
Expand Down Expand Up @@ -376,7 +383,7 @@ private[spark] object JsonProtocol {
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList)
val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AnalysisException protected[sql] (
}

override def getMessage: String = {
val planAnnotation = plan.map(p => s";\n$p").getOrElse("")
val planAnnotation = Option(plan).flatten.map(p => s";\n$p").getOrElse("")
getSimpleMessage + planAnnotation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,13 @@ case class FileSourceScanExec(
}

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
val ret = relation.location.listFiles(partitionFilters, dataFilters)
val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000

metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
metrics("metadataTime").add(timeTaken)
metrics("metadataTime").add(timeTakenMs)

val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class CatalogFileIndex(
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
Expand All @@ -79,8 +80,9 @@ class CatalogFileIndex(
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
Expand Down Expand Up @@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec)
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,14 @@ trait FileIndex {

/** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
def partitionSchema: StructType

/**
* Returns an optional metadata operation time, in nanoseconds, for listing files.
*
* We do file listing in query optimization (in order to get the proper statistics) and we want
* to account for file listing time in physical execution (as metrics). To do that, we save the
* file listing time in some implementations and physical execution calls it in this method
* to update the metrics.
*/
def metadataOpsTimeNs: Option[Long] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
// Get the serializer for the state, taking into account whether we need to save timestamps
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
// Get the deserializer for the state. Note that this must be done in the driver, as
// resolving and binding of deserializer expressions to the encoded type can be safely done
// only in the driver.
private val stateDeserializer = stateEncoder.resolveAndBind().deserializer


/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
Expand Down Expand Up @@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)

// Converter for translating state rows to Java objects
// Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
stateEncoder.resolveAndBind().deserializer, stateAttributes)

// Converter for translating state Java objects to rows
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)

// Index of the additional metadata fields in the state row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2598,4 +2598,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
assert(!jobStarted.get(), "Command should not trigger a Spark job.")
}

test("SPARK-20164: AnalysisException should be tolerant to null query plan") {
try {
throw new AnalysisException("", None, None, plan = null)
} catch {
case ae: AnalysisException => assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("max files per trigger - incorrect values") {
testQuietly("max files per trigger - incorrect values") {
val testTable = "maxFilesPerTrigger_test"
withTable(testTable) {
withTempDir { case src =>
Expand Down Expand Up @@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

import testImplicits._

test("file source stress test") {
testQuietly("file source stress test") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout

import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
Expand Down Expand Up @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assertNumStateRows(total = 1, updated = 2),

StopStream,
StartStream(ProcessingTime("1 second"), triggerClock = clock),
AdvanceManualClock(10 * 1000),
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),

AddData(inputData, "c"),
AdvanceManualClock(1 * 1000),
AdvanceManualClock(11 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
CheckAnswer((1, 2), (2, 2), (3, 2)))
}

test("recover from a Spark v2.1 checkpoint") {
testQuietly("recover from a Spark v2.1 checkpoint") {
var inputData: MemoryStream[Int] = null
var query: DataStreamWriter[Row] = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {

case a: AddData =>
try {
// Add data and get the source where it was added, and the expected offset of the
// added data.

// If the query is running with manual clock, then wait for the stream execution
// thread to start waiting for the clock to increment. This is needed so that we
// are adding data when there is no trigger that is active. This would ensure that
// the data gets deterministically added to the next batch triggered after the manual
// clock is incremented in following AdvanceManualClock. This avoid race conditions
// between the test thread and the stream execution thread in tests using manual
// clock.
if (currentStream != null &&
currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
eventually("Error while synchronizing with manual clock before adding data") {
if (currentStream.isActive) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (!currentStream.isActive) {
failTest("Query terminated while synchronizing with manual clock")
}
}
// Add data
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

test("StreamingQuery should be Serializable but cannot be used in executors") {
testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
ds.writeStream
.queryName(queryName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand All @@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
import HiveMetastoreCatalog._

private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase

def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
QualifiedTableName(
tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
tableIdent.table.toLowerCase)
}

/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
private val tableCreationLocks = Striped.lazyWeakLock(100)

Expand All @@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
new Path(new Path(dbLocation), tblName).toString
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = QualifiedTableName(
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
table.table.toLowerCase)
tableRelationCache.getIfPresent(key)
}

private def getCached(
Expand Down Expand Up @@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

private def convertToLogicalRelation(
def convertToLogicalRelation(
relation: CatalogRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
Expand Down Expand Up @@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
case NonFatal(ex) =>
logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
}

/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}

private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[ParquetFileFormat]
val mergeSchema = sessionState.conf.getConf(
HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)

convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)

// Read path
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
shouldConvertMetastoreParquet(relation) =>
convertToParquetRelation(relation)
}
}
}

/**
* When scanning Metastore ORC tables, convert them to ORC data source relations
* for better performance.
*/
object OrcConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[OrcFileFormat]
val options = Map[String, String]()

convertToLogicalRelation(relation, options, fileFormatClass, "orc")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)

// Read path
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
shouldConvertMetastoreOrc(relation) =>
convertToOrcRelation(relation)
}
}
}
}


private[hive] object HiveMetastoreCatalog {
def mergeWithMetastoreSchema(
metastoreSchema: StructType,
Expand Down
Loading