diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2cb88919c8c8..1d2cb7acefa3 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -264,8 +264,7 @@ private[spark] object JsonProtocol { ("Submission Time" -> submissionTime) ~ ("Completion Time" -> completionTime) ~ ("Failure Reason" -> failureReason) ~ - ("Accumulables" -> JArray( - stageInfo.accumulables.values.map(accumulableInfoToJson).toList)) + ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values)) } def taskInfoToJson(taskInfo: TaskInfo): JValue = { @@ -281,7 +280,15 @@ private[spark] object JsonProtocol { ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ ("Killed" -> taskInfo.killed) ~ - ("Accumulables" -> JArray(taskInfo.accumulables.toList.map(accumulableInfoToJson))) + ("Accumulables" -> accumulablesToJson(taskInfo.accumulables)) + } + + private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses") + + def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = { + JArray(accumulables + .filterNot(_.name.exists(accumulableBlacklist.contains)) + .toList.map(accumulableInfoToJson)) } def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { @@ -376,7 +383,7 @@ private[spark] object JsonProtocol { ("Message" -> fetchFailed.message) case exceptionFailure: ExceptionFailure => val stackTrace = stackTraceToJson(exceptionFailure.stackTrace) - val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList) + val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates) ("Class Name" -> exceptionFailure.className) ~ ("Description" -> exceptionFailure.description) ~ ("Stack Trace" -> stackTrace) ~ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala index ff8576157305..50ee6cd4085e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -43,7 +43,7 @@ class AnalysisException protected[sql] ( } override def getMessage: String = { - val planAnnotation = plan.map(p => s";\n$p").getOrElse("") + val planAnnotation = Option(plan).flatten.map(p => s";\n$p").getOrElse("") getSimpleMessage + planAnnotation } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 239151495f4b..2fa660c4d5e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -172,12 +172,13 @@ case class FileSourceScanExec( } @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { + val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) - val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000 + val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 metrics("numFiles").add(ret.map(_.files.size.toLong).sum) - metrics("metadataTime").add(timeTaken) + metrics("metadataTime").add(timeTakenMs) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index db0254f8d558..4046396d0e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -69,6 +69,7 @@ class CatalogFileIndex( */ def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { if (table.partitionColumnNames.nonEmpty) { + val startTime = System.nanoTime() val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) val partitions = selectedPartitions.map { p => @@ -79,8 +80,9 @@ class CatalogFileIndex( path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } val partitionSpec = PartitionSpec(partitionSchema, partitions) + val timeNs = System.nanoTime() - startTime new PrunedInMemoryFileIndex( - sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) + sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) } else { new InMemoryFileIndex( sparkSession, rootPaths, table.storage.properties, partitionSchema = None) @@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex( sparkSession: SparkSession, tableBasePath: Path, fileStatusCache: FileStatusCache, - override val partitionSpec: PartitionSpec) + override val partitionSpec: PartitionSpec, + override val metadataOpsTimeNs: Option[Long]) extends InMemoryFileIndex( sparkSession, partitionSpec.partitions.map(_.path), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 6b99d38fe572..094a66a2820f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -72,4 +72,14 @@ trait FileIndex { /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */ def partitionSchema: StructType + + /** + * Returns an optional metadata operation time, in nanoseconds, for listing files. + * + * We do file listing in query optimization (in order to get the proper statistics) and we want + * to account for file listing time in physical execution (as metrics). To do that, we save the + * file listing time in some implementations and physical execution calls it in this method + * to update the metrics. + */ + def metadataOpsTimeNs: Option[Long] = None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index c7262ea97200..e42df5dd61c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec( val encSchemaAttribs = stateEncoder.schema.toAttributes if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs } + // Get the serializer for the state, taking into account whether we need to save timestamps + private val stateSerializer = { + val encoderSerializer = stateEncoder.namedExpressions + if (isTimeoutEnabled) { + encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) + } else { + encoderSerializer + } + } + // Get the deserializer for the state. Note that this must be done in the driver, as + // resolving and binding of deserializer expressions to the encoded type can be safely done + // only in the driver. + private val stateDeserializer = stateEncoder.resolveAndBind().deserializer + /** Distribute by grouping attributes */ override def requiredChildDistribution: Seq[Distribution] = @@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec( ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) - // Converter for translating state rows to Java objects + // Converters for translating state between rows and Java objects private val getStateObjFromRow = ObjectOperator.deserializeRowToObject( - stateEncoder.resolveAndBind().deserializer, stateAttributes) - - // Converter for translating state Java objects to rows - private val stateSerializer = { - val encoderSerializer = stateEncoder.namedExpressions - if (isTimeoutEnabled) { - encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP) - } else { - encoderSerializer - } - } + stateDeserializer, stateAttributes) private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer) // Index of the additional metadata fields in the state row diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index d9e0196c5795..0dd9296a3f0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2598,4 +2598,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } assert(!jobStarted.get(), "Command should not trigger a Spark job.") } + + test("SPARK-20164: AnalysisException should be tolerant to null query plan") { + try { + throw new AnalysisException("", None, None, plan = null) + } catch { + case ae: AnalysisException => assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index f705da3d6a70..171877abe6e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } - test("max files per trigger - incorrect values") { + testQuietly("max files per trigger - incorrect values") { val testTable = "maxFilesPerTrigger_test" withTable(testTable) { withTempDir { case src => @@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { import testImplicits._ - test("file source stress test") { + testQuietly("file source stress test") { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index a00a1a582a97..c8e31e3ca2e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -21,6 +21,8 @@ import java.sql.Date import java.util.concurrent.ConcurrentHashMap import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.SparkException import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf assertNumStateRows(total = 1, updated = 2), StopStream, - StartStream(ProcessingTime("1 second"), triggerClock = clock), - AdvanceManualClock(10 * 1000), + StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), AddData(inputData, "c"), - AdvanceManualClock(1 * 1000), + AdvanceManualClock(11 * 1000), CheckLastBatch(("b", "-1"), ("c", "1")), assertNumStateRows(total = 1, updated = 2), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 32920f6dfa22..388f15405e70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -426,7 +426,7 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } - test("recover from a Spark v2.1 checkpoint") { + testQuietly("recover from a Spark v2.1 checkpoint") { var inputData: MemoryStream[Int] = null var query: DataStreamWriter[Row] = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 8cf179133681..951ff2ca0d68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case a: AddData => try { - // Add data and get the source where it was added, and the expected offset of the - // added data. + + // If the query is running with manual clock, then wait for the stream execution + // thread to start waiting for the clock to increment. This is needed so that we + // are adding data when there is no trigger that is active. This would ensure that + // the data gets deterministically added to the next batch triggered after the manual + // clock is incremented in following AdvanceManualClock. This avoid race conditions + // between the test thread and the stream execution thread in tests using manual + // clock. + if (currentStream != null && + currentStream.triggerClock.isInstanceOf[StreamManualClock]) { + val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] + eventually("Error while synchronizing with manual clock before adding data") { + if (currentStream.isActive) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (!currentStream.isActive) { + failTest("Query terminated while synchronizing with manual clock") + } + } + // Add data val queryToUse = Option(currentStream).orElse(Option(lastStream)) val (source, offset) = a.addData(queryToUse) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3f41ecdb7ff6..1172531fe998 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } - test("StreamingQuery should be Serializable but cannot be used in executors") { + testQuietly("StreamingQuery should be Serializable but cannot be used in executors") { def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = { ds.writeStream .queryName(queryName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 305bd007c93f..10f432570e94 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} -import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache import HiveMetastoreCatalog._ - private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase - - def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { - QualifiedTableName( - tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, - tableIdent.table.toLowerCase) - } - /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ private val tableCreationLocks = Striped.lazyWeakLock(100) @@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { - // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri - new Path(new Path(dbLocation), tblName).toString + // For testing only + private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { + val key = QualifiedTableName( + table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase, + table.table.toLowerCase) + tableRelationCache.getIfPresent(key) } private def getCached( @@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - private def convertToLogicalRelation( + def convertToLogicalRelation( relation: CatalogRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], @@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case NonFatal(ex) => logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex) } - - /** - * When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet - * data source relations for better performance. - */ - object ParquetConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = { - relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && - sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) - } - - private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = { - val fileFormatClass = classOf[ParquetFileFormat] - val mergeSchema = sessionState.conf.getConf( - HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) - val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) - - convertToLogicalRelation(relation, options, fileFormatClass, "parquet") - } - - override def apply(plan: LogicalPlan): LogicalPlan = { - plan transformUp { - // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - !r.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) - - // Read path - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && - shouldConvertMetastoreParquet(relation) => - convertToParquetRelation(relation) - } - } - } - - /** - * When scanning Metastore ORC tables, convert them to ORC data source relations - * for better performance. - */ - object OrcConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = { - relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && - sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) - } - - private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = { - val fileFormatClass = classOf[OrcFileFormat] - val options = Map[String, String]() - - convertToLogicalRelation(relation, options, fileFormatClass, "orc") - } - - override def apply(plan: LogicalPlan): LogicalPlan = { - plan transformUp { - // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Orc data source (yet). - if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - !r.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) - - // Read path - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && - shouldConvertMetastoreOrc(relation) => - convertToOrcRelation(relation) - } - } - } } + private[hive] object HiveMetastoreCatalog { def mergeWithMetastoreSchema( metastoreSchema: StructType, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 2cc20a791d80..9e3eb2dd8234 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} @@ -43,7 +41,7 @@ import org.apache.spark.util.Utils private[sql] class HiveSessionCatalog( externalCatalog: HiveExternalCatalog, globalTempViewManager: GlobalTempViewManager, - private val metastoreCatalog: HiveMetastoreCatalog, + val metastoreCatalog: HiveMetastoreCatalog, functionRegistry: FunctionRegistry, conf: SQLConf, hadoopConf: Configuration, @@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog( parser, functionResourceLoader) { - // ---------------------------------------------------------------- - // | Methods and fields for interacting with HiveMetastoreCatalog | - // ---------------------------------------------------------------- - - // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e. - // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. - val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions - val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - - def hiveDefaultTableFilePath(name: TableIdentifier): String = { - metastoreCatalog.hiveDefaultTableFilePath(name) - } - - // For testing only - private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { - val key = metastoreCatalog.getQualifiedTableName(table) - tableRelationCache.getIfPresent(key) - } - override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { makeFunctionBuilder(funcName, Utils.classForName(className)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 2f3dfa05e9ef..9d3b31f39c0f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -75,8 +75,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = new DetermineTableStats(session) +: - catalog.ParquetConversions +: - catalog.OrcConversions +: + RelationConversions(conf, catalog) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: DataSourceAnalysis(conf) +: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b5ce027d51e7..0465e9c031e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive import java.io.IOException -import java.net.URI import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst @@ -31,9 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.hive.orc.OrcFileFormat +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} /** @@ -170,6 +171,55 @@ object HiveAnalysis extends Rule[LogicalPlan] { } } +/** + * Relation conversion from metastore relations to data source relations for better performance + * + * - When writing to non-partitioned Hive-serde Parquet/Orc tables + * - When scanning Hive-serde Parquet/ORC tables + * + * This rule must be run before all other DDL post-hoc resolution rules, i.e. + * `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. + */ +case class RelationConversions( + conf: SQLConf, + sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { + private def isConvertible(relation: CatalogRelation): Boolean = { + (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) || + (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && + conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)) + } + + private def convert(relation: CatalogRelation): LogicalRelation = { + if (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet")) { + val options = Map(ParquetOptions.MERGE_SCHEMA -> + conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + sessionCatalog.metastoreCatalog + .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") + } else { + val options = Map[String, String]() + sessionCatalog.metastoreCatalog + .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc") + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan transformUp { + // Write path + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && isConvertible(r) => + InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists) + + // Read path + case relation: CatalogRelation + if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => + convert(relation) + } + } +} + private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 0b157a45e6e0..25bd4d0017bd 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -72,8 +72,7 @@ public void setUp() throws IOException { path.delete(); } HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog(); - hiveManagedPath = new Path( - catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable"))); + hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable"))); fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration()); fs.delete(hiveManagedPath, true); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 962998ea6fb6..3191b9975fbf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -413,7 +413,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } // Table lookup will make the table cached. spark.table(tableIndent) - statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent) + statsBeforeUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent) .asInstanceOf[LogicalRelation].catalogTable.get.stats.get sql(s"INSERT INTO $tableName SELECT 2") @@ -423,7 +423,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") } spark.table(tableIndent) - statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent) + statsAfterUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent) .asInstanceOf[LogicalRelation].catalogTable.get.stats.get } (statsBeforeUpdate, statsAfterUpdate) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 9fc2923bb6fd..23f21e6b9931 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -449,8 +449,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } - private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = { - sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id) + private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { + sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + .getCachedDataSourceTable(table) } test("Caching converted data source Parquet Relations") {