Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
16be3e5
This commit contains three changes:
yhuai Jul 9, 2014
3fa0df5
Provide easier ways to construct a StructType.
yhuai Jul 10, 2014
90460ac
Infer the Catalyst data type from an object and cast a data value to …
yhuai Jul 10, 2014
03eec4c
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 10, 2014
0266761
Format
yhuai Jul 10, 2014
43a45e1
Remove sql.util.package introduced in a previous commit.
yhuai Jul 11, 2014
7a6a7e5
Fix bug introduced by the change made on SQLContext.inferSchema.
yhuai Jul 11, 2014
949d6bb
When creating a SchemaRDD for a JSON dataset, users can apply an exis…
yhuai Jul 11, 2014
eca7d04
Add two apply methods which will be used to extract StructField(s) fr…
yhuai Jul 11, 2014
fc649d7
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 11, 2014
9168b83
Update comments.
yhuai Jul 11, 2014
dcaf22f
Add a field containsNull to ArrayType to indicate if an array can con…
yhuai Jul 12, 2014
3209108
Add unit tests.
yhuai Jul 16, 2014
68525a2
Update JSON unit test.
yhuai Jul 16, 2014
b8b7db4
1. Move sql package object and package-info to sql-core.
yhuai Jul 16, 2014
2e58dbd
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 16, 2014
c3f4a02
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 16, 2014
42d47a3
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 16, 2014
e495e4e
More comments.
yhuai Jul 16, 2014
85e9b51
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 16, 2014
1d9c13a
Update applySchema API.
yhuai Jul 23, 2014
9c99bc0
Several minor updates.
yhuai Jul 23, 2014
8da1a17
Add Row.fromSeq.
yhuai Jul 23, 2014
aa92e84
Update data type tests.
yhuai Jul 23, 2014
624765c
Tests for applySchema.
yhuai Jul 23, 2014
1c9f33c
Java APIs for DataTypes and Row.
yhuai Jul 24, 2014
b9f3071
Java API for applySchema.
yhuai Jul 24, 2014
33c4fec
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 24, 2014
d48fc7b
Minor updates.
yhuai Jul 25, 2014
1db9531
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 25, 2014
246da96
Add java data type APIs to javadoc index.
yhuai Jul 25, 2014
1d93395
Python APIs.
yhuai Jul 28, 2014
692c0b9
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 28, 2014
3edb3ae
Python doc.
yhuai Jul 28, 2014
1cb35fe
Add "valueContainsNull" to MapType.
yhuai Jul 28, 2014
991f860
Move "asJavaDataType" and "asScalaDataType" to DataTypeConversions.sc…
yhuai Jul 28, 2014
bd40a33
Address comments.
yhuai Jul 29, 2014
fc2bed1
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 29, 2014
ab71f21
Format.
yhuai Jul 29, 2014
2476ed0
Minor updates.
yhuai Jul 29, 2014
03bfd95
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 29, 2014
122d1e7
Address comments.
yhuai Jul 29, 2014
e5f8df5
Scaladoc.
yhuai Jul 29, 2014
4ceeb66
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 29, 2014
c712fbf
Converts types of values based on defined schema.
yhuai Jul 30, 2014
a6e08b4
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
yhuai Jul 30, 2014
1d45977
Clean up.
yhuai Jul 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ object Row {
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)

/**
* Construct a [[Row]] with the given values.
*/
def apply(values: Any*): Row = new GenericRow(values.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.language.dynamics

import org.apache.spark.sql.catalyst.types.DataType

case object DynamicType extends DataType
case object DynamicType extends DataType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind adding scaladoc to explain what DynamicType is used for? (While you are at it, also add scaladoc for WrapDynamic and DynamicRow)

def simpleString: String = "dynamic"
}

case class WrapDynamic(children: Seq[Attribute]) extends Expression {
type EvaluatedType = DynamicRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,52 +125,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}.toSeq
}

protected def generateSchemaString(schema: Seq[Attribute]): String = {
val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
schema.foreach { attribute =>
val name = attribute.name
val dataType = attribute.dataType
dataType match {
case fields: StructType =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(fields: StructType) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(elementType: DataType) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case _ => builder.append(s"$prefix-- $name: $dataType\n")
}
}

builder.toString()
}

protected def generateSchemaString(
schema: StructType,
prefix: String,
builder: StringBuilder): StringBuilder = {
schema.fields.foreach {
case StructField(name, fields: StructType, _) =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(fields: StructType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(elementType: DataType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case StructField(name, fieldType: DataType, _) =>
builder.append(s"$prefix-- $name: $fieldType\n")
}

builder
}
def schema: StructType = StructType.fromAttributes(output)

/** Returns the output schema in the tree format. */
def schemaString: String = generateSchemaString(output)
def formattedSchemaString: String = schema.formattedSchemaString

/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
def printSchema(): Unit = println(formattedSchemaString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ object DataType extends RegexParsers {
"true" ^^^ true |
"false" ^^^ false


protected lazy val structType: Parser[DataType] =
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
case fields => new StructType(fields)
Expand Down Expand Up @@ -93,47 +92,56 @@ abstract class DataType {
}

def isPrimitive: Boolean = false

def simpleString: String
}

case object NullType extends DataType
case object NullType extends DataType {
def simpleString: String = "null"
}

trait PrimitiveType extends DataType {
override def isPrimitive = true
}

abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
private[sql] type JvmType
@transient private[sql] val tag: TypeTag[JvmType]
private[sql] val ordering: Ordering[JvmType]

@transient val classTag = {
@transient private[sql] val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}

case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = String
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "string"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you at it, add a blank line to separate each class

case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
private[sql] type JvmType = Array[Byte]
def simpleString: String = "binary"
}
case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Boolean
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "boolean"
}

case object TimestampType extends NativeType {
type JvmType = Timestamp
private[sql] type JvmType = Timestamp

@transient lazy val tag = typeTag[JvmType]
@transient private[sql] lazy val tag = typeTag[JvmType]

val ordering = new Ordering[JvmType] {
private[sql] val ordering = new Ordering[JvmType] {
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
}

def simpleString: String = "timestamp"
}

abstract class NumericType extends NativeType with PrimitiveType {
Expand All @@ -142,7 +150,7 @@ abstract class NumericType extends NativeType with PrimitiveType {
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
// desugared by the compiler into an argument to the objects constructor. This means there is no
// longer an no argument constructor and thus the JVM cannot serialize the object anymore.
val numeric: Numeric[JvmType]
private[sql] val numeric: Numeric[JvmType]
}

/** Matcher for any expressions that evaluate to [[IntegralType]]s */
Expand All @@ -154,39 +162,43 @@ object IntegralType {
}

abstract class IntegralType extends NumericType {
val integral: Integral[JvmType]
private[sql] val integral: Integral[JvmType]
}

case object LongType extends IntegralType {
type JvmType = Long
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Long]]
val integral = implicitly[Integral[Long]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Long
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Long]]
private[sql] val integral = implicitly[Integral[Long]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "long"
}

case object IntegerType extends IntegralType {
type JvmType = Int
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Int]]
val integral = implicitly[Integral[Int]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Int
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Int]]
private[sql] val integral = implicitly[Integral[Int]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "integer"
}

case object ShortType extends IntegralType {
type JvmType = Short
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Short]]
val integral = implicitly[Integral[Short]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Short
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Short]]
private[sql] val integral = implicitly[Integral[Short]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "short"
}

case object ByteType extends IntegralType {
type JvmType = Byte
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Byte]]
val integral = implicitly[Integral[Byte]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Byte
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Byte]]
private[sql] val integral = implicitly[Integral[Byte]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "byte"
}

/** Matcher for any expressions that evaluate to [[FractionalType]]s */
Expand All @@ -197,47 +209,127 @@ object FractionalType {
}
}
abstract class FractionalType extends NumericType {
val fractional: Fractional[JvmType]
private[sql] val fractional: Fractional[JvmType]
}

case object DecimalType extends FractionalType {
type JvmType = BigDecimal
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[BigDecimal]]
val fractional = implicitly[Fractional[BigDecimal]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = BigDecimal
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[BigDecimal]]
private[sql] val fractional = implicitly[Fractional[BigDecimal]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "decimal"
}

case object DoubleType extends FractionalType {
type JvmType = Double
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Double]]
val fractional = implicitly[Fractional[Double]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Double
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Double]]
private[sql] val fractional = implicitly[Fractional[Double]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "double"
}

case object FloatType extends FractionalType {
type JvmType = Float
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Float]]
val fractional = implicitly[Fractional[Float]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Float
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Float]]
private[sql] val fractional = implicitly[Fractional[Float]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "float"
}

case class ArrayType(elementType: DataType) extends DataType
case class ArrayType(elementType: DataType) extends DataType {
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- element: ${elementType.simpleString}\n")
elementType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}
}

case class StructField(name: String, dataType: DataType, nullable: Boolean)
def simpleString: String = "array"
}

case class StructField(name: String, dataType: DataType, nullable: Boolean) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add scaladoc to define the semantics of nullable (nullable keys vs nullable values vs both)


private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n")
dataType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}
}
}

object StructType {
def fromAttributes(attributes: Seq[Attribute]): StructType = {
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
}

private def validateFields(fields: Seq[StructField]): Boolean =
fields.map(field => field.name).distinct.size == fields.size

// def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
}

case class StructType(fields: Seq[StructField]) extends DataType {
require(StructType.validateFields(fields), "Found fields with the same name.")

def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())

def formattedSchemaString: String = {
val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
fields.foreach(field => field.buildFormattedString(prefix, builder))

builder.toString()
}

def printSchema(): Unit = println(formattedSchemaString)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
fields.foreach(field => field.buildFormattedString(prefix, builder))
}

def simpleString: String = "struct"
}

case class MapType(keyType: DataType, valueType: DataType) extends DataType
case class MapType(keyType: DataType, valueType: DataType) extends DataType {
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- key: ${keyType.simpleString}\n")
keyType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matching code is duplicated like 4 times AFAICT. Perhaps it could just be a protected function in DataType.

case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}

builder.append(s"${prefix}-- value: ${valueType.simpleString}\n")
valueType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}
}

def simpleString: String = "map"
}
Loading