Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,22 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.")
}
// We recursively add all jars in the class loader chain,
// starting from the given urlClassLoader.
def addJars(urlClassLoader: URLClassLoader): Array[URL] = {
val jarsInParent = urlClassLoader.getParent match {
case parent: URLClassLoader => addJars(parent)
case other => Array.empty[URL]
}

urlClassLoader.getURLs ++ jarsInParent
// We recursively find all jars in the class loader chain,
// starting from the given classLoader.
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
case null => Array.empty[URL]
case urlClassLoader: URLClassLoader =>
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
case other => allJars(other.getParent)
}

val jars = Utils.getContextOrSparkClassLoader match {
case urlClassLoader: URLClassLoader => addJars(urlClassLoader)
case other =>
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore " +
s"using classloader ${other.getClass.getName}. " +
"Please set spark.sql.hive.metastore.jars")
val classLoader = Utils.getContextOrSparkClassLoader
val jars = allJars(classLoader)
if (jars.length == 0) {
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore. " +
"Please set spark.sql.hive.metastore.jars.")
}

logInfo(
Expand Down Expand Up @@ -356,9 +354,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
hiveconf.set(key, value)
executionHive.runSqlHive(s"SET $key=$value")
metadataHive.runSqlHive(s"SET $key=$value")
// If users put any Spark SQL setting in the spark conf (e.g. spark-defaults.conf),
// this setConf will be called in the constructor of the SQLContext.
// Also, calling hiveconf will create a default session containing a HiveConf, which
// will interfer with the creation of executionHive (which is a lazy val). So,
// we put hiveconf.set at the end of this method.
hiveconf.set(key, value)
}

/* A catalyst metadata catalog that points to the Hive Metastore. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,20 +707,20 @@ private[hive] case class MetastoreRelation
hiveQlTable.getMetadata
)

implicit class SchemaAttribute(f: FieldSchema) {
implicit class SchemaAttribute(f: HiveColumn) {
def toAttribute: AttributeReference = AttributeReference(
f.getName,
HiveMetastoreTypes.toDataType(f.getType),
f.name,
HiveMetastoreTypes.toDataType(f.hiveType),
// Since data can be dumped in randomly with no validation, everything is nullable.
nullable = true
)(qualifiers = Seq(alias.getOrElse(tableName)))
}

// Must be a stable value since new attributes are born here.
val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute)
/** PartitionKey attributes */
val partitionKeys = table.partitionColumns.map(_.toAttribute)

/** Non-partitionKey attributes */
val attributes = hiveQlTable.getCols.map(_.toAttribute)
val attributes = table.schema.map(_.toAttribute)

val output = attributes ++ partitionKeys

Expand Down