Skip to content

Commit 273c2fd

Browse files
marmbrusrxin
authored andcommitted
[SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs
This makes it possible to create tables and insert into them using the DSL and SQL for the scala and java apis. Author: Michael Armbrust <[email protected]> Closes #354 from marmbrus/insertIntoTable and squashes the following commits: 6c6f227 [Michael Armbrust] Create random temporary files in python parquet unit tests. f5e6d5c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into insertIntoTable 765c506 [Michael Armbrust] Add to JavaAPI. 77b512c [Michael Armbrust] typos. 5c3ef95 [Michael Armbrust] use names for boolean args. 882afdf [Michael Armbrust] Change createTableAs to saveAsTable. Clean up api annotations. d07d94b [Michael Armbrust] Add tests, support for creating parquet files and hive tables. fa3fe81 [Michael Armbrust] Make insertInto available on JavaSchemaRDD as well. Add createTableAs function.
1 parent 63ca581 commit 273c2fd

File tree

16 files changed

+535
-160
lines changed

16 files changed

+535
-160
lines changed

python/pyspark/sql.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,12 @@ def parquetFile(self, path):
106106
"""
107107
Loads a Parquet file, returning the result as a L{SchemaRDD}.
108108
109+
>>> import tempfile, shutil
110+
>>> parquetFile = tempfile.mkdtemp()
111+
>>> shutil.rmtree(parquetFile)
109112
>>> srdd = sqlCtx.inferSchema(rdd)
110-
>>> srdd.saveAsParquetFile("/tmp/tmp.parquet")
111-
>>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet")
113+
>>> srdd.saveAsParquetFile(parquetFile)
114+
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
112115
>>> srdd.collect() == srdd2.collect()
113116
True
114117
"""
@@ -278,9 +281,12 @@ def saveAsParquetFile(self, path):
278281
that are written out using this method can be read back in as a SchemaRDD using the
279282
L{SQLContext.parquetFile} method.
280283
284+
>>> import tempfile, shutil
285+
>>> parquetFile = tempfile.mkdtemp()
286+
>>> shutil.rmtree(parquetFile)
281287
>>> srdd = sqlCtx.inferSchema(rdd)
282-
>>> srdd.saveAsParquetFile("/tmp/test.parquet")
283-
>>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet")
288+
>>> srdd.saveAsParquetFile(parquetFile)
289+
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
284290
>>> srdd2.collect() == srdd.collect()
285291
True
286292
"""

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,26 @@ package org.apache.spark.sql
2020
import scala.language.implicitConversions
2121
import scala.reflect.runtime.universe.TypeTag
2222

23+
import org.apache.hadoop.conf.Configuration
24+
2325
import org.apache.spark.SparkContext
24-
import org.apache.spark.annotation.{AlphaComponent, Experimental}
26+
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
2527
import org.apache.spark.rdd.RDD
28+
2629
import org.apache.spark.sql.catalyst.analysis._
27-
import org.apache.spark.sql.catalyst.dsl
30+
import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
2831
import org.apache.spark.sql.catalyst.expressions._
2932
import org.apache.spark.sql.catalyst.types._
3033
import org.apache.spark.sql.catalyst.optimizer.Optimizer
3134
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
3235
import org.apache.spark.sql.catalyst.rules.RuleExecutor
36+
3337
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
38+
3439
import org.apache.spark.sql.execution._
40+
import org.apache.spark.sql.execution.SparkStrategies
41+
42+
import org.apache.spark.sql.parquet.ParquetRelation
3543

3644
/**
3745
* :: AlphaComponent ::
@@ -65,12 +73,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
6573
new this.QueryExecution { val logical = plan }
6674

6775
/**
68-
* :: Experimental ::
76+
* :: DeveloperApi ::
6977
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
70-
* interface is considered internal, and thus not guranteed to be stable. As a result, using
71-
* them directly is not reccomended.
78+
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
79+
* them directly is not recommended.
7280
*/
73-
@Experimental
81+
@DeveloperApi
7482
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
7583

7684
/**
@@ -89,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
8997
def parquetFile(path: String): SchemaRDD =
9098
new SchemaRDD(this, parquet.ParquetRelation(path))
9199

100+
/**
101+
* :: Experimental ::
102+
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
103+
* This registered table can be used as the target of future `insertInto` operations.
104+
*
105+
* {{{
106+
* val sqlContext = new SQLContext(...)
107+
* import sqlContext._
108+
*
109+
* case class Person(name: String, age: Int)
110+
* createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
111+
* sql("INSERT INTO people SELECT 'michael', 29")
112+
* }}}
113+
*
114+
* @tparam A A case class type that describes the desired schema of the parquet file to be
115+
* created.
116+
* @param path The path where the directory containing parquet metadata should be created.
117+
* Data inserted into this table will also be stored at this location.
118+
* @param allowExisting When false, an exception will be thrown if this directory already exists.
119+
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
120+
* output format.
121+
*
122+
* @group userf
123+
*/
124+
@Experimental
125+
def createParquetFile[A <: Product : TypeTag](
126+
path: String,
127+
allowExisting: Boolean = true,
128+
conf: Configuration = new Configuration()): SchemaRDD = {
129+
new SchemaRDD(
130+
this,
131+
ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
132+
}
92133

93134
/**
94135
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
@@ -208,9 +249,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
208249
}
209250

210251
/**
252+
* :: DeveloperApi ::
211253
* The primary workflow for executing relational queries using Spark. Designed to allow easy
212254
* access to the intermediate phases of query execution for developers.
213255
*/
256+
@DeveloperApi
214257
protected abstract class QueryExecution {
215258
def logical: LogicalPlan
216259

@@ -231,7 +274,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
231274
override def toString: String =
232275
s"""== Logical Plan ==
233276
|${stringOrError(analyzed)}
234-
|== Optimized Logical Plan
277+
|== Optimized Logical Plan ==
235278
|${stringOrError(optimizedPlan)}
236279
|== Physical Plan ==
237280
|${stringOrError(executedPlan)}

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql
2020
import net.razorvine.pickle.Pickler
2121

2222
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
23-
import org.apache.spark.annotation.{AlphaComponent, Experimental}
23+
import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi}
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.analysis._
2626
import org.apache.spark.sql.catalyst.expressions._
@@ -83,8 +83,6 @@ import java.util.{Map => JMap}
8383
* rdd.where('key === 1).orderBy('value.asc).select('key).collect()
8484
* }}}
8585
*
86-
* @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs.
87-
*
8886
* @groupname Query Language Integrated Queries
8987
* @groupdesc Query Functions that create new queries from SchemaRDDs. The
9088
* result of all query functions is also a SchemaRDD, allowing multiple operations to be
@@ -276,8 +274,8 @@ class SchemaRDD(
276274
* an `OUTER JOIN` in SQL. When no output rows are produced by the generator for a
277275
* given row, a single row will be output, with `NULL` values for each of the
278276
* generated columns.
279-
* @param alias an optional alias that can be used as qualif for the attributes that are produced
280-
* by this generate operation.
277+
* @param alias an optional alias that can be used as qualifier for the attributes that are
278+
* produced by this generate operation.
281279
*
282280
* @group Query
283281
*/
@@ -290,29 +288,13 @@ class SchemaRDD(
290288
new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))
291289

292290
/**
293-
* :: Experimental ::
294-
* Adds the rows from this RDD to the specified table. Note in a standard [[SQLContext]] there is
295-
* no notion of persistent tables, and thus queries that contain this operator will fail to
296-
* optimize. When working with an extension of a SQLContext that has a persistent catalog, such
297-
* as a `HiveContext`, this operation will result in insertions to the table specified.
291+
* Returns this RDD as a SchemaRDD. Intended primarily to force the invocation of the implicit
292+
* conversion from a standard RDD to a SchemaRDD.
298293
*
299294
* @group schema
300295
*/
301-
@Experimental
302-
def insertInto(tableName: String, overwrite: Boolean = false) =
303-
new SchemaRDD(
304-
sqlContext,
305-
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))
306-
307-
/**
308-
* Returns this RDD as a SchemaRDD.
309-
* @group schema
310-
*/
311296
def toSchemaRDD = this
312297

313-
/** FOR INTERNAL USE ONLY */
314-
def analyze = sqlContext.analyzer(logicalPlan)
315-
316298
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
317299
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
318300
this.mapPartitions { iter =>

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.annotation.{DeveloperApi, Experimental}
21+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2022
import org.apache.spark.sql.catalyst.plans.logical._
2123

2224
/**
@@ -29,14 +31,24 @@ trait SchemaRDDLike {
2931
private[sql] def baseSchemaRDD: SchemaRDD
3032

3133
/**
34+
* :: DeveloperApi ::
3235
* A lazily computed query execution workflow. All other RDD operations are passed
33-
* through to the RDD that is produced by this workflow.
36+
* through to the RDD that is produced by this workflow. This workflow is produced lazily because
37+
* invoking the whole query optimization pipeline can be expensive.
3438
*
35-
* We want this to be lazy because invoking the whole query optimization pipeline can be
36-
* expensive.
39+
* The query execution is considered a Developer API as phases may be added or removed in future
40+
* releases. This execution is only exposed to provide an interface for inspecting the various
41+
* phases for debugging purposes. Applications should not depend on particular phases existing
42+
* or producing any specific output, even for exactly the same query.
43+
*
44+
* Additionally, the RDD exposed by this execution is not designed for consumption by end users.
45+
* In particular, it does not contain any schema information, and it reuses Row objects
46+
* internally. This object reuse improves performance, but can make programming against the RDD
47+
* more difficult. Instead end users should perform RDD operations on a SchemaRDD directly.
3748
*/
3849
@transient
39-
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
50+
@DeveloperApi
51+
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
4052

4153
override def toString =
4254
s"""${super.toString}
@@ -45,7 +57,8 @@ trait SchemaRDDLike {
4557

4658
/**
4759
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
48-
* are written out using this method can be read back in as a SchemaRDD using the ``function
60+
* are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
61+
* function.
4962
*
5063
* @group schema
5164
*/
@@ -62,4 +75,40 @@ trait SchemaRDDLike {
6275
def registerAsTable(tableName: String): Unit = {
6376
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
6477
}
78+
79+
/**
80+
* :: Experimental ::
81+
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
82+
*
83+
* @group schema
84+
*/
85+
@Experimental
86+
def insertInto(tableName: String, overwrite: Boolean): Unit =
87+
sqlContext.executePlan(
88+
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
89+
90+
/**
91+
* :: Experimental ::
92+
* Appends the rows from this RDD to the specified table.
93+
*
94+
* @group schema
95+
*/
96+
@Experimental
97+
def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
98+
99+
/**
100+
* :: Experimental ::
101+
* Creates a table from the the contents of this SchemaRDD. This will fail if the table already
102+
* exists.
103+
*
104+
* Note that this currently only works with SchemaRDDs that are created from a HiveContext as
105+
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
106+
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
107+
* be the target of an `insertInto`.
108+
*
109+
* @group schema
110+
*/
111+
@Experimental
112+
def saveAsTable(tableName: String): Unit =
113+
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
65114
}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
package org.apache.spark.sql.api.java
1919

20-
import java.beans.{Introspector, PropertyDescriptor}
20+
import java.beans.Introspector
2121

22+
import org.apache.hadoop.conf.Configuration
23+
24+
import org.apache.spark.annotation.Experimental
2225
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
2326
import org.apache.spark.sql.SQLContext
2427
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
@@ -45,29 +48,42 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
4548
result
4649
}
4750

51+
/**
52+
* :: Experimental ::
53+
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
54+
* a table. This registered table can be used as the target of future insertInto` operations.
55+
*
56+
* {{{
57+
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
58+
*
59+
* sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
60+
* sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
61+
* }}}
62+
*
63+
* @param beanClass A java bean class object that will be used to determine the schema of the
64+
* parquet file. s
65+
* @param path The path where the directory containing parquet metadata should be created.
66+
* Data inserted into this table will also be stored at this location.
67+
* @param allowExisting When false, an exception will be thrown if this directory already exists.
68+
* @param conf A Hadoop configuration object that can be used to specific options to the parquet
69+
* output format.
70+
*/
71+
@Experimental
72+
def createParquetFile(
73+
beanClass: Class[_],
74+
path: String,
75+
allowExisting: Boolean = true,
76+
conf: Configuration = new Configuration()): JavaSchemaRDD = {
77+
new JavaSchemaRDD(
78+
sqlContext,
79+
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
80+
}
81+
4882
/**
4983
* Applies a schema to an RDD of Java Beans.
5084
*/
5185
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
52-
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
53-
val beanInfo = Introspector.getBeanInfo(beanClass)
54-
55-
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
56-
val schema = fields.map { property =>
57-
val dataType = property.getPropertyType match {
58-
case c: Class[_] if c == classOf[java.lang.String] => StringType
59-
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
60-
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
61-
case c: Class[_] if c == java.lang.Long.TYPE => LongType
62-
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
63-
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
64-
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
65-
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
66-
}
67-
68-
AttributeReference(property.getName, dataType, true)()
69-
}
70-
86+
val schema = getSchema(beanClass)
7187
val className = beanClass.getCanonicalName
7288
val rowRdd = rdd.rdd.mapPartitions { iter =>
7389
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
@@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
97113
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
98114
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
99115
}
116+
117+
/** Returns a Catalyst Schema for the given java bean class. */
118+
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
119+
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
120+
val beanInfo = Introspector.getBeanInfo(beanClass)
121+
122+
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
123+
fields.map { property =>
124+
val dataType = property.getPropertyType match {
125+
case c: Class[_] if c == classOf[java.lang.String] => StringType
126+
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
127+
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
128+
case c: Class[_] if c == java.lang.Long.TYPE => LongType
129+
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
130+
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
131+
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
132+
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
133+
}
134+
// TODO: Nullability could be stricter.
135+
AttributeReference(property.getName, dataType, nullable = true)()
136+
}
137+
}
100138
}

0 commit comments

Comments
 (0)