Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class DataTypes {
*/
public static final DataType TimestampType = TimestampType$.MODULE$;

/**
* Gets the IntervalType object.
*/
public static final DataType IntervalType = IntervalType$.MODULE$;

/**
* Gets the DoubleType object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTERVAL = Keyword("INTERVAL")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
Expand Down Expand Up @@ -279,12 +280,12 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
throw new AnalysisException(s"invalid function approximate $udfName")
}
}
| APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
| APPROXIMATE ~> "(" ~> unsignedFloat ~ ")" ~ ident ~ "(" ~ DISTINCT ~ expression <~ ")" ^^
{ case s ~ _ ~ udfName ~ _ ~ _ ~ exp =>
if (lexical.normalizeKeyword(udfName) == "count") {
ApproxCountDistinct(exp, s.toDouble)
} else {
throw new AnalysisException(s"invalid function approximate($floatLit) $udfName")
throw new AnalysisException(s"invalid function approximate($s) $udfName")
}
}
| CASE ~> whenThenElse ^^ CaseWhen
Expand All @@ -309,6 +310,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
( numericLiteral
| booleanLiteral
| stringLit ^^ {case s => Literal.create(s, StringType) }
| intervalLiteral
| NULL ^^^ Literal.create(null, NullType)
)

Expand All @@ -318,21 +320,71 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
)

protected lazy val numericLiteral: Parser[Literal] =
signedNumericLiteral | unsignedNumericLiteral

protected lazy val sign: Parser[String] =
"+" | "-"

protected lazy val signedNumericLiteral: Parser[Literal] =
( sign ~ numericLit ^^ { case s ~ l => Literal(toNarrowestIntegerType(s + l)) }
| sign ~ floatLit ^^ { case s ~ f => Literal((s + f).toDouble) }
( integral ^^ { case i => Literal(toNarrowestIntegerType(i)) }
| sign.? ~ unsignedFloat ^^ { case s ~ f => Literal((s.getOrElse("") + f).toDouble) }
)

protected lazy val unsignedNumericLiteral: Parser[Literal] =
( numericLit ^^ { n => Literal(toNarrowestIntegerType(n)) }
| floatLit ^^ { f => Literal(f.toDouble) }
protected lazy val unsignedFloat: Parser[String] =
( "." ~> numericLit ^^ { u => "0." + u }
| elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
)

protected lazy val sign: Parser[String] = ("+" | "-")

protected lazy val integral: Parser[String] =
sign.? ~ numericLit ^^ { case s ~ n => s.getOrElse("") + n }

private def intervalUnit(unitName: String) =
acceptIf {
case lexical.Identifier(str) =>
val normalized = lexical.normalizeKeyword(str)
normalized == unitName || normalized == unitName + "s"
case _ => false
} {_ => "wrong interval unit"}

protected lazy val month: Parser[Int] =
integral <~ intervalUnit("month") ^^ { case num => num.toInt }

protected lazy val year: Parser[Int] =
integral <~ intervalUnit("year") ^^ { case num => num.toInt * 12 }

protected lazy val microsecond: Parser[Long] =
integral <~ intervalUnit("microsecond") ^^ { case num => num.toLong }

protected lazy val millisecond: Parser[Long] =
integral <~ intervalUnit("millisecond") ^^ { case num => num.toLong * 1000 }

protected lazy val second: Parser[Long] =
integral <~ intervalUnit("second") ^^ { case num => num.toLong * 1000 * 1000 }

protected lazy val minute: Parser[Long] =
integral <~ intervalUnit("minute") ^^ { case num => num.toLong * 1000 * 1000 * 60 }

protected lazy val hour: Parser[Long] =
integral <~ intervalUnit("hour") ^^ { case num => num.toLong * 1000 * 1000 * 3600 }

protected lazy val day: Parser[Long] =
integral <~ intervalUnit("day") ^^ { case num => num.toLong * 1000 * 1000 * 3600 * 24 }

protected lazy val week: Parser[Long] =
integral <~ intervalUnit("week") ^^ { case num => num.toLong * 1000 * 1000 * 3600 * 24 * 7 }

protected lazy val intervalLiteral: Parser[Literal] =
INTERVAL ~> year.? ~ month.? ~ week.? ~ day.? ~ hour.? ~ minute.? ~ second.? ~
millisecond.? ~ microsecond.? ^^ {
case year ~ month ~ week ~ day ~ hour ~ minute ~ second ~
millisecond ~ microsecond =>
if (!Seq(year, month, week, day, hour, minute, second,
millisecond, microsecond).exists(_.isDefined)) {
throw new AnalysisException(
"at least one time unit should be given for interval literal")
}
val months = Seq(year, month).map(_.getOrElse(0)).sum
val microseconds = Seq(week, day, hour, minute, second, millisecond, microsecond)
.map(_.getOrElse(0L)).sum
Literal.create(Interval(months, microseconds), IntervalType)
}

private def toNarrowestIntegerType(value: String): Any = {
val bigIntValue = BigDecimal(value)

Expand All @@ -343,19 +395,14 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
}
}

protected lazy val floatLit: Parser[String] =
( "." ~> unsignedNumericLiteral ^^ { u => "0." + u }
| elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
)

protected lazy val baseExpression: Parser[Expression] =
( "*" ^^^ UnresolvedStar(None)
| ident <~ "." ~ "*" ^^ { case tableName => UnresolvedStar(Option(tableName)) }
| primary
)

protected lazy val signedPrimary: Parser[Expression] =
sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e}
sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e }

protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", {
case lexical.Identifier(str) => str
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types


/**
* The internal representation of interval type.
*/
case class Interval(months: Int, microseconds: Long) extends Serializable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually can you make this a java class and move it to unsafe/src/main/java/org/apache/spark/unsafe/types ?

Just leave the fields public so we can easily access them in codegen.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

import org.apache.spark.annotation.DeveloperApi


/**
* :: DeveloperApi ::
* The data type representing time intervals.
*
* Please use the singleton [[DataTypes.IntervalType]].
*/
@DeveloperApi
class IntervalType private() extends DataType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't make it extend AtomicType here, as I haven't figured out how to compare intervals. 30 days and 1 months may have different compare result in different context.


override def defaultSize: Int = 4096

private[spark] override def asNullable: IntervalType = this
}

case object IntervalType extends IntervalType
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TimestampType private() extends AtomicType {
private[sql] val ordering = implicitly[Ordering[InternalType]]

/**
* The default size of a value of the TimestampType is 12 bytes.
* The default size of a value of the TimestampType is 8 bytes.
*/
override def defaultSize: Int = 8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ private[sql] object ResolvedDataSource {
mode: SaveMode,
options: Map[String, String],
data: DataFrame): ResolvedDataSource = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
sys.error("Cannot save interval data type into external storage.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is this called? if it is called during analysis, it'd make more sense to throw AnalysisException, since that has better error reporting in Python.

}
val clazz: Class[_] = lookupDataSource(provider)
val relation = clazz.newInstance() match {
case dataSource: CreatableRelationProvider =>
Expand Down
22 changes: 22 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1467,4 +1467,26 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
checkAnswer(sql("select count, sort from t"), Row(1, "a"))
}
}

test("SPARK-8753: add interval type") {
val df = sql("select interval 3 years -3 month 7 week 123 microseconds")
checkAnswer(df, Row(Interval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 )))
withTempPath(f => {
// Currently we don't yet support saving out values of interval data type.
intercept[RuntimeException] {
df.write.json(f.getCanonicalPath)
}
})

def checkIntervalParseError(s: String): Unit = {
val e = intercept[AnalysisException] {
sql(s)
}
e.message.contains("at least one time unit should be given for interval literal")
}

checkIntervalParseError("select interval")
// Currently we don't yet support nanosecond
checkIntervalParseError("select interval 23 nanosecond")
}
}