Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package org.apache.spark.sql
import java.io.CharArrayWriter
import java.util.Properties

import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.unsafe.types.UTF8String

import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand All @@ -42,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation}
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -1650,8 +1647,12 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().saveAsTable(tableName)`.
*/
Expand All @@ -1669,8 +1670,12 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`.
*/
Expand All @@ -1689,8 +1694,12 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().format(source).saveAsTable(tableName)`.
*/
Expand All @@ -1709,8 +1718,12 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`.
*/
Expand All @@ -1728,8 +1741,12 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @group output
* @deprecated As of 1.4.0, replaced by
* `write().format(source).mode(mode).options(options).saveAsTable(tableName)`.
Expand All @@ -1754,8 +1771,12 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @group output
* @deprecated As of 1.4.0, replaced by
* `write().format(source).mode(mode).options(options).saveAsTable(tableName)`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.HadoopFsRelation


/**
Expand Down Expand Up @@ -185,6 +186,12 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* When `mode` is `Append`, the schema of the [[DataFrame]] need to be
* the same as that of the existing table, and format or options will be ignored.
*
* When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input
* path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC
* and Parquet), the table is persisted in a Hive compatible format, which means other systems
* like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL
* specific format.
*
* @since 1.4.0
*/
def saveAsTable(tableName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.sql.hive

import scala.collection.JavaConversions._
import scala.collection.mutable

import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.Warehouse
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata._
Expand All @@ -40,9 +42,59 @@ import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}

private[hive] case class HiveSerDe(
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None)

private[hive] object HiveSerDe {
/**
* Get the Hive SerDe information from the data source abbreviation string or classname.
*
* @param source Currently the source abbreviation can be one of the following:
* SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
* @param hiveConf Hive Conf
* @return HiveSerDe associated with the specified source
*/
def sourceToSerDe(source: String, hiveConf: HiveConf): Option[HiveSerDe] = {
val serdeMap = Map(
"sequencefile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),

"rcfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))),

"orc" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),

"parquet" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")))

val key = source.toLowerCase match {
case _ if source.startsWith("org.apache.spark.sql.parquet") => "parquet"
case _ if source.startsWith("org.apache.spark.sql.orc") => "orc"
case _ => source.toLowerCase
}

serdeMap.get(key)
}
}

private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
extends Catalog with Logging {

Expand Down Expand Up @@ -164,15 +216,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
processDatabaseAndTableName(database, tableIdent.table)
}

val tableProperties = new scala.collection.mutable.HashMap[String, String]
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)

// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
if (userSpecifiedSchema.isDefined) {
userSpecifiedSchema.foreach { schema =>
val threshold = conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
Expand All @@ -194,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simplily ignore them and provide a warning message..
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
Expand All @@ -210,15 +262,95 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
ManagedTable
}

client.createTable(
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource = ResolvedDataSource(
hive, userSpecifiedSchema, partitionColumns, provider, options)

def newSparkSQLSpecificMetastoreTable(): HiveTable = {
HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
schema = Seq.empty,
partitionColumns = metastorePartitionColumns,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options))
serdeProperties = options)
}

def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
schema.map { field =>
HiveColumn(
name = field.name,
hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
comment = "")
}
}

val partitionColumns = schemaToHiveColumn(relation.partitionColumns)
val dataColumns = schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains)

HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
schema = dataColumns,
partitionColumns = partitionColumns,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options,
location = Some(relation.paths.head),
viewText = None, // TODO We need to place the SQL string here.
inputFormat = serde.inputFormat,
outputFormat = serde.outputFormat,
serde = serde.serde)
}

// TODO: Support persisting partitioned data source relations in Hive compatible format
val hiveTable = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
logInfo {
"Persisting data source relation with a single input path into Hive metastore in Hive " +
s"compatible format. Input path: ${relation.paths.head}"
}
newHiveCompatibleMetastoreTable(relation, serde)

case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
logWarning {
val paths = relation.paths.mkString(", ")
"Persisting partitioned data source relation into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " +
paths.mkString("\n", "\n", "")
}
newSparkSQLSpecificMetastoreTable()

case (Some(serde), relation: HadoopFsRelation) =>
logWarning {
val paths = relation.paths.mkString(", ")
"Persisting data source relation with multiple input paths into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " +
paths.mkString("\n", "\n", "")
}
newSparkSQLSpecificMetastoreTable()

case (Some(serde), _) =>
logWarning {
s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
"Persisting it into Hive metastore in Spark SQL specific format, " +
"which is NOT compatible with Hive."
}
newSparkSQLSpecificMetastoreTable()

case _ =>
logWarning {
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
"Persisting data source relation into Hive metastore in Spark SQL specific format, " +
"which is NOT compatible with Hive."
}
newSparkSQLSpecificMetastoreTable()
}

client.createTable(hiveTable)
}

def hiveDefaultTableFilePath(tableName: String): String = {
Expand Down Expand Up @@ -463,7 +595,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.size > 0) {
val schema = if (table.schema.nonEmpty) {
table.schema
} else {
child.output.map {
Expand Down
Loading