@@ -102,8 +102,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
102102 * this does not necessarily need to be the same version of Hive that is used internally by
103103 * Spark SQL for execution.
104104 */
105- protected [hive] def hiveVersion : String =
106- getConf(" spark.sql.hive.version" , " 0.13.1" )
105+ protected [hive] def hiveMetastoreVersion : String =
106+ getConf(" spark.sql.hive.metastore.version" , " 0.13.1" )
107+
108+ /**
109+ * The location of the jars that should be used to instantiate the HiveMetastoreClient. This
110+ * property can be one of three option:
111+ * - a comma-separated list of jar files that could be passed to a URLClassLoader
112+ * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
113+ * option is only valid when using the execution version of Hive.
114+ * - maven - download the correct version of hive on demand from maven.
115+ */
116+ protected [hive] def hiveMetastoreJars : String =
117+ getConf(" spark.sql.hive.metastore.jars" , " builtin" )
107118
108119 @ transient
109120 protected [sql] lazy val substitutor = new VariableSubstitution ()
@@ -121,6 +132,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
121132 executionConf.set(
122133 " javax.jdo.option.ConnectionURL" , s " jdbc:derby:;databaseName= $localMetastore;create=true " )
123134
135+ /** The version of hive used internally by Spark SQL. */
136+ lazy val hiveExecutionVersion : String = " 0.13.1"
137+
124138 /**
125139 * The copy of the hive client that is used for execution. Currently this must always be
126140 * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
@@ -129,31 +143,71 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
129143 * for storing peristent metadata, and only point to a dummy metastore in a temporary directory.
130144 */
131145 @ transient
132- protected [hive] lazy val executionHive : ClientWrapper =
133- new IsolatedClientLoader (
134- version = IsolatedClientLoader .hiveVersion( " 13 " ),
135- isolationOn = false ,
146+ protected [hive] lazy val executionHive : ClientWrapper = {
147+ logInfo( s " Initilizing execution hive, version $hiveExecutionVersion " )
148+ new ClientWrapper (
149+ version = IsolatedClientLoader .hiveVersion(hiveExecutionVersion) ,
136150 config = Map (
137151 " javax.jdo.option.ConnectionURL" ->
138- s " jdbc:derby:;databaseName= $localMetastore;create=true " ),
139- rootClassLoader = Utils .getContextOrSparkClassLoader).client. asInstanceOf [ ClientWrapper ]
152+ s " jdbc:derby:;databaseName= $localMetastore;create=true " ))
153+ }
140154 SessionState .setCurrentSessionState(executionHive.state)
141155
142156 /**
143- * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. This
157+ * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore.
144158 * The version of the Hive client that is used here must match the metastore that is configured
145159 * in the hive-site.xml file.
146160 */
147161 @ transient
148162 protected [hive] lazy val metadataHive : ClientInterface = {
163+ val metaVersion = IsolatedClientLoader .hiveVersion(hiveMetastoreVersion)
164+
149165 // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options
150166 // into the isolated client loader
151167 val metadataConf = new HiveConf ()
152- val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap
168+ // `configure` goes second to override other settings.
169+ val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure
170+
171+ val isolatedLoader = if (hiveMetastoreJars == " builtin" ) {
172+ if (hiveExecutionVersion != hiveMetastoreVersion) {
173+ throw new IllegalArgumentException (
174+ " Builtin jars can only be used when hive execution version == hive metastore version. " +
175+ s " Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " +
176+ " Specify a vaild path to the correct hive jars using spark.sql.hive.metastore.jars " +
177+ s " or change spark.sql.hive.metastore.version to ${hiveExecutionVersion}. " )
178+ }
179+ val jars = getClass.getClassLoader match {
180+ case urlClassLoader : java.net.URLClassLoader => urlClassLoader.getURLs
181+ case other =>
182+ throw new IllegalArgumentException (
183+ " Unable to locate hive jars to connect to metastore " +
184+ s " using classloader ${other.getClass.getName}. " +
185+ " Please set spark.sql.hive.metastore.jars" )
186+ }
153187
154- // Config goes second to override other settings.
155- // TODO: Support for loading the jars from an already downloaded location.
156- IsolatedClientLoader .forVersion(hiveVersion, allConfig ++ configure).client
188+ logInfo(
189+ s " Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes. " )
190+ new IsolatedClientLoader (
191+ version = metaVersion,
192+ execJars = jars.toSeq,
193+ config = allConfig,
194+ isolationOn = true )
195+ } else if (hiveMetastoreJars == " maven" ) {
196+ // TODO: Support for loading the jars from an already downloaded location.
197+ logInfo(
198+ s " Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven. " )
199+ IsolatedClientLoader .forVersion(hiveMetastoreVersion, allConfig )
200+ } else {
201+ val jars = hiveMetastoreJars.split(" ," ).map(new java.net.URL (_))
202+ logInfo(
203+ s " Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars" )
204+ new IsolatedClientLoader (
205+ version = metaVersion,
206+ execJars = jars.toSeq,
207+ config = allConfig,
208+ isolationOn = true )
209+ }
210+ isolatedLoader.client
157211 }
158212
159213 protected [sql] override def parseSql (sql : String ): LogicalPlan = {
0 commit comments