Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f31065f
A query plan or a SchemaRDD can print out its schema.
yhuai Jun 6, 2014
f45583b
Infer the schema of a JSON dataset (a text file with one JSON object …
yhuai Jun 6, 2014
af91b23
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 6, 2014
0576406
Add Apache license header.
yhuai Jun 6, 2014
f3ce176
After type conflict resolution, if a NullType is found, StringType is…
yhuai Jun 6, 2014
a2313a6
Address PR comments.
yhuai Jun 10, 2014
666b957
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 10, 2014
0387523
Address PR comments.
yhuai Jun 10, 2014
52a2275
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 10, 2014
8846af5
API doc.
yhuai Jun 10, 2014
65b87f0
Fix sampling...
yhuai Jun 10, 2014
4325475
If a sampled dataset is used for schema inferring, update the schema …
yhuai Jun 11, 2014
a5a4b52
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 11, 2014
8ffed79
Update the example.
yhuai Jun 11, 2014
66f9e76
Update docs and use the entire dataset to infer the schema.
yhuai Jun 13, 2014
8347f2e
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 13, 2014
6df0891
Apache header.
yhuai Jun 13, 2014
ab810b0
Make JsonRDD private.
yhuai Jun 13, 2014
d0bd412
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 13, 2014
cff84cc
Use a SchemaRDD for a JSON dataset.
yhuai Jun 16, 2014
7027634
Java API.
yhuai Jun 16, 2014
9df8c5a
Python API.
yhuai Jun 16, 2014
4fbddf0
Programming guide.
yhuai Jun 16, 2014
6d20b85
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 16, 2014
e7a6c19
SchemaRDD.javaToPython should convert a field with the StructType to …
yhuai Jun 16, 2014
83013fb
Update Java Example.
yhuai Jun 16, 2014
6a5f5ef
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 16, 2014
7ea750e
marmbrus's comments.
yhuai Jun 16, 2014
d7a005c
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 16, 2014
1f908ce
Remove extra line.
yhuai Jun 16, 2014
5428451
Newline
yhuai Jun 16, 2014
79ea9ba
Fix typos.
yhuai Jun 17, 2014
e2773a6
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 17, 2014
ce31c81
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 17, 2014
94ffdaa
Remove "get" from method names.
yhuai Jun 17, 2014
bc9ac51
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 17, 2014
ce8eedd
rxin's comments.
yhuai Jun 18, 2014
227e89e
Merge remote-tracking branch 'upstream/master' into newJson
yhuai Jun 18, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,23 @@ object SparkBuild extends Build {
def sqlCoreSettings = sharedSettings ++ Seq(
name := "spark-sql",
libraryDependencies ++= Seq(
"com.twitter" % "parquet-column" % parquetVersion,
"com.twitter" % "parquet-hadoop" % parquetVersion
)
"com.twitter" % "parquet-column" % parquetVersion,
"com.twitter" % "parquet-hadoop" % parquetVersion,
"com.fasterxml.jackson.core" % "jackson-core" % "2.3.2"
),
initialCommands in console :=
"""
|import org.apache.spark.sql.catalyst.analysis._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to import all of these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the console is primarily for developers then I find it pretty useful to have all the sorts of things I'd want for debugging in scope. This is how hive/console is already.

|import org.apache.spark.sql.catalyst.dsl._
|import org.apache.spark.sql.catalyst.errors._
|import org.apache.spark.sql.catalyst.expressions._
|import org.apache.spark.sql.catalyst.plans.logical._
|import org.apache.spark.sql.catalyst.rules._
|import org.apache.spark.sql.catalyst.types._
|import org.apache.spark.sql.catalyst.util._
|import org.apache.spark.sql.execution
|import org.apache.spark.sql.test.TestSQLContext._
|import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)

// Since we don't include hive in the main assembly this project also acts as an alternative
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ trait HiveTypeCoercion {
*
* Additionally, all types when UNION-ed with strings will be promoted to strings.
* Other string conversions are handled by PromoteStrings.
*
* A widening conversion of a value with IntegerType and LongType to FloatType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more clear if you document it this way:

Widening types might result in loss of precision the following cases:
 - IntegerType to FloatType
 - LongType to FloatType
 - LongType to DoubleType

* or of a value with LongType to DoubleType, may result in loss of precision.
*/
object WidenTypes extends Rule[LogicalPlan] {
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.plans

abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
Expand Down Expand Up @@ -123,4 +124,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
case other => Nil
}.toSeq
}

def printSchema(): Unit = {
println(plans.generateSchemaTreeString(output))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,56 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.types.{StructField, DataType, ArrayType, StructType}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order the imports


/**
* A a collection of common abstractions for query plans as well as
* a base logical plan representation.
*/
package object plans
package object plans {
def generateSchemaTreeString(schema: Seq[Attribute]): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark this as private[sql]? or private[spark]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't mark any of catalyst as private. We just don't include any of the scala doc in the default spark distribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only used in QueryPlan? If so, maybe is should just live there as a protected method.

val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
schema.foreach {
attribute => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put attribute on the previous line to reduce indenting, i.e.

schema.foreach { attribute =>

}

(also reduced one level of curly braces)

val name = attribute.name
val dataType = attribute.dataType
dataType match {
case fields: StructType =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaTreeString(fields, s"$prefix |", builder)
case ArrayType(fields: StructType) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaTreeString(fields, s"$prefix |", builder)
case ArrayType(elementType: DataType) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case _ => builder.append(s"$prefix-- $name: $dataType\n")
}
}
}

builder.toString()
}

def generateSchemaTreeString(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark this private?

schema: StructType,
prefix: String,
builder: StringBuilder): StringBuilder = {
schema.fields.foreach {
case StructField(name, fields: StructType, _) =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaTreeString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(fields: StructType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaTreeString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(elementType: DataType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case StructField(name, fieldType: DataType, _) =>
builder.append(s"$prefix-- $name: $fieldType\n")
}

builder
}
}
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.3.2</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell I think in general sub project pom files don't specify dependency versions. Can you verify?

</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
36 changes: 36 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies

import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.json._

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -97,6 +98,41 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))

/**
* Loads a JSON file, returning the result as a [[SchemaRDD]].
* Right now, we only do eager schema resolution.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs the @experimental annotation too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too, although with sampling

def jsonFile(
path: String,
mode: SchemaResolutionMode = EAGER_SCHEMA_RESOLUTION): SchemaRDD = {
logger.info(s"Loads a JSON file $path.")
val json = sparkContext.textFile(path)
jsonRDD(json, mode)
}

/**
* Loads a RDD[String] storing JSON objects (one object per record),
* returning the result as a [[SchemaRDD]].
* Right now, we only do eager schema resolution.
*/
def jsonRDD(
json: RDD[String],
mode: SchemaResolutionMode = EAGER_SCHEMA_RESOLUTION): SchemaRDD = {
mode match {
case EAGER_SCHEMA_RESOLUTION =>
logger.info(s"Eagerly resolve the schema without sampling.")
val logicalPlan = JsonTable.inferSchema(json)
logicalPlanToSparkQuery(logicalPlan)
case EAGER_SCHEMA_RESOLUTION_WITH_SAMPLING(fraction) =>
logger.info(s"Eagerly resolve the schema with sampling " +
s"(sampling fraction: $fraction).")
val logicalPlan = JsonTable.inferSchema(json, Some(fraction))
logicalPlanToSparkQuery(logicalPlan)
case LAZY_SCHEMA_RESOLUTION =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should just leave this out so users will get compile errors instead of runtime errors.

throw new UnsupportedOperationException("Lazy schema resolution has not been implemented.")
}
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,10 @@ private[sql] trait SchemaRDDLike {
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd


/**
* Print the schema of this SchemaRDD.
*/
def printSchema = queryExecution.analyzed.printSchema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the return type explicitly here (is it just Unit)?

We should explicitly define returns types of all public functions.

}
Loading