Skip to content

Commit cfa397c

Browse files
rxinmarmbrus
authored andcommitted
[SPARK-5193][SQL] Tighten up SQLContext API
1. Removed 2 implicits (logicalPlanToSparkQuery and baseRelationToSchemaRDD) 2. Moved extraStrategies into ExperimentalMethods. 3. Made private methods protected[sql] so they don't show up in javadocs. 4. Removed createParquetFile. 5. Added Java version of applySchema to SQLContext. Author: Reynold Xin <[email protected]> Closes #4049 from rxin/sqlContext-refactor and squashes the following commits: a326a1a [Reynold Xin] Remove createParquetFile and add applySchema for Java to SQLContext. ecd6685 [Reynold Xin] Added baseRelationToSchemaRDD back. 4a38c9b [Reynold Xin] [SPARK-5193][SQL] Tighten up SQLContext API
1 parent 13d2406 commit cfa397c

File tree

10 files changed

+152
-281
lines changed

10 files changed

+152
-281
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.annotation.Experimental
21+
22+
/**
23+
* Holder for experimental methods for the bravest. We make NO guarantee about the stability
24+
* regarding binary compatibility and source compatibility of methods here.
25+
*/
26+
@Experimental
27+
class ExperimentalMethods protected[sql](sqlContext: SQLContext) {
28+
29+
/**
30+
* Allows extra strategies to be injected into the query planner at runtime. Note this API
31+
* should be consider experimental and is not intended to be stable across releases.
32+
*/
33+
@Experimental
34+
var extraStrategies: Seq[Strategy] = Nil
35+
36+
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 93 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.beans.Introspector
2021
import java.util.Properties
2122

2223
import scala.collection.immutable
2324
import scala.language.implicitConversions
2425
import scala.reflect.runtime.universe.TypeTag
2526

26-
import org.apache.hadoop.conf.Configuration
2727
import org.apache.spark.SparkContext
2828
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
29+
import org.apache.spark.api.java.JavaRDD
2930
import org.apache.spark.rdd.RDD
3031
import org.apache.spark.sql.catalyst.ScalaReflection
3132
import org.apache.spark.sql.catalyst.analysis._
@@ -36,9 +37,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3637
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3738
import org.apache.spark.sql.execution._
3839
import org.apache.spark.sql.json._
39-
import org.apache.spark.sql.parquet.ParquetRelation
40-
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation}
40+
import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
4141
import org.apache.spark.sql.types._
42+
import org.apache.spark.util.Utils
4243

4344
/**
4445
* :: AlphaComponent ::
@@ -59,7 +60,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
5960
self =>
6061

6162
// Note that this is a lazy val so we can override the default value in subclasses.
62-
private[sql] lazy val conf: SQLConf = new SQLConf
63+
protected[sql] lazy val conf: SQLConf = new SQLConf
6364

6465
/** Set Spark SQL configuration properties. */
6566
def setConf(props: Properties): Unit = conf.setConf(props)
@@ -117,15 +118,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
117118
case _ =>
118119
}
119120

120-
/**
121-
* :: DeveloperApi ::
122-
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
123-
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
124-
* them directly is not recommended.
125-
*/
126-
@DeveloperApi
127-
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
128-
129121
/**
130122
* Creates a SchemaRDD from an RDD of case classes.
131123
*
@@ -139,8 +131,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
139131
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self))
140132
}
141133

142-
implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
143-
logicalPlanToSparkQuery(LogicalRelation(baseRelation))
134+
/**
135+
* Convert a [[BaseRelation]] created for external data sources into a [[SchemaRDD]].
136+
*/
137+
def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
138+
new SchemaRDD(this, LogicalRelation(baseRelation))
144139
}
145140

146141
/**
@@ -181,6 +176,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
181176
new SchemaRDD(this, logicalPlan)
182177
}
183178

179+
/**
180+
* Applies a schema to an RDD of Java Beans.
181+
*
182+
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
183+
* SELECT * queries will return the columns in an undefined order.
184+
*/
185+
def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = {
186+
val attributeSeq = getSchema(beanClass)
187+
val className = beanClass.getName
188+
val rowRdd = rdd.mapPartitions { iter =>
189+
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
190+
val localBeanInfo = Introspector.getBeanInfo(
191+
Class.forName(className, true, Utils.getContextOrSparkClassLoader))
192+
val extractors =
193+
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
194+
195+
iter.map { row =>
196+
new GenericRow(
197+
extractors.zip(attributeSeq).map { case (e, attr) =>
198+
DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
199+
}.toArray[Any]
200+
) : Row
201+
}
202+
}
203+
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this))
204+
}
205+
206+
/**
207+
* Applies a schema to an RDD of Java Beans.
208+
*
209+
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
210+
* SELECT * queries will return the columns in an undefined order.
211+
*/
212+
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = {
213+
applySchema(rdd.rdd, beanClass)
214+
}
215+
184216
/**
185217
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
186218
*
@@ -259,41 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
259291
applySchema(rowRDD, appliedSchema)
260292
}
261293

262-
/**
263-
* :: Experimental ::
264-
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
265-
* This registered table can be used as the target of future `insertInto` operations.
266-
*
267-
* {{{
268-
* val sqlContext = new SQLContext(...)
269-
* import sqlContext._
270-
*
271-
* case class Person(name: String, age: Int)
272-
* createParquetFile[Person]("path/to/file.parquet").registerTempTable("people")
273-
* sql("INSERT INTO people SELECT 'michael', 29")
274-
* }}}
275-
*
276-
* @tparam A A case class type that describes the desired schema of the parquet file to be
277-
* created.
278-
* @param path The path where the directory containing parquet metadata should be created.
279-
* Data inserted into this table will also be stored at this location.
280-
* @param allowExisting When false, an exception will be thrown if this directory already exists.
281-
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
282-
* output format.
283-
*
284-
* @group userf
285-
*/
286-
@Experimental
287-
def createParquetFile[A <: Product : TypeTag](
288-
path: String,
289-
allowExisting: Boolean = true,
290-
conf: Configuration = new Configuration()): SchemaRDD = {
291-
new SchemaRDD(
292-
this,
293-
ParquetRelation.createEmpty(
294-
path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
295-
}
296-
297294
/**
298295
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
299296
* during the lifetime of this instance of SQLContext.
@@ -336,12 +333,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
336333
new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
337334

338335
/**
339-
* :: DeveloperApi ::
340-
* Allows extra strategies to be injected into the query planner at runtime. Note this API
341-
* should be consider experimental and is not intended to be stable across releases.
336+
* A collection of methods that are considered experimental, but can be used to hook into
337+
* the query planner for advanced functionalities.
342338
*/
343-
@DeveloperApi
344-
var extraStrategies: Seq[Strategy] = Nil
339+
val experimental: ExperimentalMethods = new ExperimentalMethods(this)
345340

346341
protected[sql] class SparkPlanner extends SparkStrategies {
347342
val sparkContext: SparkContext = self.sparkContext
@@ -353,7 +348,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
353348
def numPartitions = self.conf.numShufflePartitions
354349

355350
def strategies: Seq[Strategy] =
356-
extraStrategies ++ (
351+
experimental.extraStrategies ++ (
357352
DataSourceStrategy ::
358353
DDLStrategy ::
359354
TakeOrdered ::
@@ -479,14 +474,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
479474
* have the same format as the one generated by `toString` in scala.
480475
* It is only used by PySpark.
481476
*/
482-
private[sql] def parseDataType(dataTypeString: String): DataType = {
477+
protected[sql] def parseDataType(dataTypeString: String): DataType = {
483478
DataType.fromJson(dataTypeString)
484479
}
485480

486481
/**
487482
* Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
488483
*/
489-
private[sql] def applySchemaToPythonRDD(
484+
protected[sql] def applySchemaToPythonRDD(
490485
rdd: RDD[Array[Any]],
491486
schemaString: String): SchemaRDD = {
492487
val schema = parseDataType(schemaString).asInstanceOf[StructType]
@@ -496,7 +491,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
496491
/**
497492
* Apply a schema defined by the schema to an RDD. It is only used by PySpark.
498493
*/
499-
private[sql] def applySchemaToPythonRDD(
494+
protected[sql] def applySchemaToPythonRDD(
500495
rdd: RDD[Array[Any]],
501496
schema: StructType): SchemaRDD = {
502497

@@ -527,4 +522,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
527522

528523
new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
529524
}
525+
526+
/**
527+
* Returns a Catalyst Schema for the given java bean class.
528+
*/
529+
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
530+
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
531+
val beanInfo = Introspector.getBeanInfo(beanClass)
532+
533+
// Note: The ordering of elements may differ from when the schema is inferred in Scala.
534+
// This is because beanInfo.getPropertyDescriptors gives no guarantees about
535+
// element ordering.
536+
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
537+
fields.map { property =>
538+
val (dataType, nullable) = property.getPropertyType match {
539+
case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
540+
(c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
541+
case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
542+
case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
543+
case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
544+
case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
545+
case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
546+
case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
547+
case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
548+
case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
549+
550+
case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
551+
case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
552+
case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
553+
case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
554+
case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
555+
case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
556+
case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
557+
case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
558+
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
559+
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
560+
}
561+
AttributeReference(property.getName, dataType, nullable)()
562+
}
563+
}
530564
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.Logging
2121
import org.apache.spark.annotation.DeveloperApi
2222
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
2324
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2425
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
2526
import org.apache.spark.sql.catalyst.plans.logical
2627
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27-
import org.apache.spark.sql.{SQLConf, SQLContext}
2828

2929
/**
3030
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -137,14 +137,12 @@ case class CacheTableCommand(
137137
isLazy: Boolean) extends RunnableCommand {
138138

139139
override def run(sqlContext: SQLContext) = {
140-
import sqlContext._
141-
142-
plan.foreach(_.registerTempTable(tableName))
143-
cacheTable(tableName)
140+
plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName))
141+
sqlContext.cacheTable(tableName)
144142

145143
if (!isLazy) {
146144
// Performs eager caching
147-
table(tableName).count()
145+
sqlContext.table(tableName).count()
148146
}
149147

150148
Seq.empty[Row]

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
2222
import scala.util.parsing.combinator.PackratParsers
2323

2424
import org.apache.spark.Logging
25-
import org.apache.spark.sql.SQLContext
25+
import org.apache.spark.sql.{SchemaRDD, SQLContext}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.SqlLexical
2828
import org.apache.spark.sql.execution.RunnableCommand
@@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing(
234234

235235
def run(sqlContext: SQLContext) = {
236236
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
237-
238-
sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
237+
new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName)
239238
Seq.empty
240239
}
241240
}

sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.sql.test
1919

20+
import scala.language.implicitConversions
21+
2022
import org.apache.spark.{SparkConf, SparkContext}
21-
import org.apache.spark.sql.{SQLConf, SQLContext}
23+
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
24+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2225

2326
/** A SQLContext that can be used for local testing. */
2427
object TestSQLContext
@@ -29,7 +32,16 @@ object TestSQLContext
2932
new SparkConf().set("spark.sql.testkey", "true"))) {
3033

3134
/** Fewer partitions to speed up testing. */
32-
private[sql] override lazy val conf: SQLConf = new SQLConf {
35+
protected[sql] override lazy val conf: SQLConf = new SQLConf {
3336
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
3437
}
38+
39+
/**
40+
* Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to
41+
* construct SchemaRDD directly out of local data without relying on implicits.
42+
*/
43+
protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = {
44+
new SchemaRDD(this, plan)
45+
}
46+
3547
}

0 commit comments

Comments
 (0)