Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Period;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -80,39 +79,8 @@ public int compareTo(CalendarInterval that) {

@Override
public String toString() {
if (months == 0 && days == 0 && microseconds == 0) {
return "0 seconds";
}

StringBuilder sb = new StringBuilder();

if (months != 0) {
appendUnit(sb, months / 12, "years");
appendUnit(sb, months % 12, "months");
}

appendUnit(sb, days, "days");

if (microseconds != 0) {
long rest = microseconds;
appendUnit(sb, rest / MICROS_PER_HOUR, "hours");
rest %= MICROS_PER_HOUR;
appendUnit(sb, rest / MICROS_PER_MINUTE, "minutes");
rest %= MICROS_PER_MINUTE;
if (rest != 0) {
String s = BigDecimal.valueOf(rest, 6).stripTrailingZeros().toPlainString();
sb.append(s).append(" seconds ");
}
}

sb.setLength(sb.length() - 1);
return sb.toString();
}

private void appendUnit(StringBuilder sb, long value, String unit) {
if (value != 0) {
sb.append(value).append(' ').append(unit).append(' ');
}
return "CalendarInterval(months= " + months + ", days = " + days + ", microsecond = " +
microseconds + ")";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we use such string representation now? Was it in order to put the same logics into IntervalUtils? If that's the case, we didn't have to move but use toString of this class until this case becomes completely exposed.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,6 @@ public void equalsTest() {
assertEquals(i1, i6);
}

@Test
public void toStringTest() {
CalendarInterval i;

i = new CalendarInterval(0, 0, 0);
assertEquals("0 seconds", i.toString());

i = new CalendarInterval(34, 0, 0);
assertEquals("2 years 10 months", i.toString());

i = new CalendarInterval(-34, 0, 0);
assertEquals("-2 years -10 months", i.toString());

i = new CalendarInterval(0, 31, 0);
assertEquals("31 days", i.toString());

i = new CalendarInterval(0, -31, 0);
assertEquals("-31 days", i.toString());

i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("3 hours 13 minutes 0.000123 seconds", i.toString());

i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
assertEquals("-3 hours -13 minutes -0.000123 seconds", i.toString());

i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
assertEquals("2 years 10 months 31 days 3 hours 13 minutes 0.000123 seconds",
i.toString());
}

@Test
public void periodAndDurationTest() {
CalendarInterval interval = new CalendarInterval(120, -40, 123456);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.IntervalUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.IntervalStyle._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.UTF8StringBuilder
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -281,6 +283,14 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case CalendarIntervalType => SQLConf.get.intervalOutputStyle match {
case SQL_STANDARD =>
buildCast[CalendarInterval](_, i => UTF8String.fromString(toSqlStandardString(i)))
case ISO_8601 =>
buildCast[CalendarInterval](_, i => UTF8String.fromString(toIso8601String(i)))
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's recommended to specify the last enum instead of using a wildcard.

buildCast[CalendarInterval](_, i => UTF8String.fromString(toMultiUnitsString(i)))
}
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d)))
case TimestampType => buildCast[Long](_,
Expand Down Expand Up @@ -985,6 +995,16 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
timestampFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));"""
case CalendarIntervalType =>
val iu = IntervalUtils.getClass.getCanonicalName.stripSuffix("$")
SQLConf.get.intervalOutputStyle match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

val funcName = SQLConf.get.intervalOutputStyle match ...
code"""$evPrim = UTF8String.fromString($iu.funcName($c));"""

case SQL_STANDARD =>
(c, evPrim, _) => code"""$evPrim = UTF8String.fromString($iu.toSqlStandardString($c));"""
case ISO_8601 =>
(c, evPrim, _) => code"""$evPrim = UTF8String.fromString($iu.toIso8601String($c));"""
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

(c, evPrim, _) => code"""$evPrim = UTF8String.fromString($iu.toMultiUnitsString($c));"""
}
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone))
s"TIMESTAMP('${formatter.format(v)}')"
case (v: Array[Byte], BinaryType) => s"X'${DatatypeConverter.printHexBinary(v)}'"
case (v: CalendarInterval, CalendarIntervalType) => IntervalUtils.toMultiUnitsString(v)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this is already asked above but why we didn't change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not supported to parse interval from iso/SQL standard format yet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we not support iso/SQL standard format here together?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _ => value.toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.IntervalStyle
import org.apache.spark.sql.internal.SQLConf.IntervalStyle.IntervalStyle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import


/**
* Options for parsing JSON data into Spark SQL rows.
Expand Down Expand Up @@ -92,6 +94,9 @@ private[sql] class JSONOptions(
val timestampFormat: String =
parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX")

val intervalOutputStyle: IntervalStyle = parameters.get("intervalOutputStyle")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add it in another PR? we need to update docs and pyspark side, which is non-trivial.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will keep the interval in json format to be multi-units style

.map(IntervalStyle.withName).getOrElse(SQLConf.get.intervalOutputStyle)

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf.IntervalStyle._
import org.apache.spark.sql.types._

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's keep the blank line here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. my fault

/**
* `JackGenerator` can only be initialized with a `StructType`, a `MapType` or an `ArrayType`.
* Once it is initialized with `StructType`, it can be used to write out a struct or an array of
Expand Down Expand Up @@ -119,6 +119,16 @@ private[sql] class JacksonGenerator(
(row: SpecializedGetters, ordinal: Int) =>
gen.writeNumber(row.getDouble(ordinal))

case CalendarIntervalType =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's deal with json in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment #26102 (comment) is valid for JSON datasource as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wasn't this comment addressed?

(row: SpecializedGetters, ordinal: Int) => options.intervalOutputStyle match {
case SQL_STANDARD =>
gen.writeString(IntervalUtils.toSqlStandardString(row.getInterval(ordinal)))
case ISO_8601 =>
gen.writeString(IntervalUtils.toIso8601String(row.getInterval(ordinal)))
case _ =>
gen.writeString(IntervalUtils.toMultiUnitsString(row.getInterval(ordinal)))
}

case StringType =>
(row: SpecializedGetters, ordinal: Int) =>
gen.writeString(row.getUTF8String(ordinal).toString)
Expand Down Expand Up @@ -214,10 +224,21 @@ private[sql] class JacksonGenerator(
private def writeMapData(
map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
val keyArray = map.keyArray()
val keyString = mapType.keyType match {
case CalendarIntervalType => options.intervalOutputStyle match {
case SQL_STANDARD =>
(i: Int) => IntervalUtils.toSqlStandardString(keyArray.getInterval(i))
case ISO_8601 =>
(i: Int) => IntervalUtils.toIso8601String(keyArray.getInterval(i))
case _ =>
(i: Int) => IntervalUtils.toMultiUnitsString(keyArray.getInterval(i))
}
case _ => (i: Int) => keyArray.get(i, mapType.keyType).toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fragile to rely on toString. e.g. UnsafeRow.toString is not human readable. Shall we recursively write map key as json object? cc @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I am sorry I missed this cc. in JSON the key should be a string. We should either make it string always or explicitly disallow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @viirya I think we talked about this before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think currently the map key is not very useful for some types. To make human readable map keys, we need do specific serialization for some map key types. Maybe I create a JIRA ticket to follow it up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah .. +1 !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path shouldn't be here per each map here BTW.

}
val valueArray = map.valueArray()
var i = 0
while (i < map.numElements()) {
gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
gen.writeFieldName(keyString(i))
if (!valueArray.isNullAt(i)) {
fieldWriter.apply(valueArray, i)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.math.BigDecimal
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal
Expand Down Expand Up @@ -424,6 +425,111 @@ object IntervalUtils {
fromDoubles(interval.months / num, interval.days / num, interval.microseconds / num)
}

def toMultiUnitsString(interval: CalendarInterval): String = {
if (interval.months == 0 && interval.days == 0 && interval.microseconds == 0) {
return "0 seconds"
}
val sb = new StringBuilder
if (interval.months != 0) {
appendUnit(sb, interval.months / 12, "years")
appendUnit(sb, interval.months % 12, "months")
}
appendUnit(sb, interval.days, "days")
if (interval.microseconds != 0) {
var rest = interval.microseconds
appendUnit(sb, rest / MICROS_PER_HOUR, "hours")
rest %= MICROS_PER_HOUR
appendUnit(sb, rest / MICROS_PER_MINUTE, "minutes")
rest %= MICROS_PER_MINUTE
if (rest != 0) {
val s = BigDecimal.valueOf(rest, 6).stripTrailingZeros.toPlainString
sb.append(s).append(" seconds ")
}
}
sb.setLength(sb.length - 1)
sb.toString
}

private def appendUnit(sb: StringBuilder, value: Long, unit: String): Unit = {
if (value != 0) sb.append(value).append(' ').append(unit).append(' ')
}

def toSqlStandardString(interval: CalendarInterval): String = {
val yearMonthPart = if (interval.months < 0) {
val ma = math.abs(interval.months)
"-" + ma / 12 + "-" + ma % 12
} else if (interval.months > 0) {
"+" + interval.months / 12 + "-" + interval.months % 12
} else {
""
}

val dayPart = if (interval.days < 0) {
interval.days.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we add -?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is likely -1.toString here

} else if (interval.days > 0) {
"+" + interval.days
} else {
""
}

val timePart = if (interval.microseconds != 0) {
val sign = if (interval.microseconds > 0) "+" else "-"
val sb = new StringBuilder(sign)
var rest = math.abs(interval.microseconds)
sb.append(rest / MICROS_PER_HOUR)
sb.append(':')
rest = rest % MICROS_PER_HOUR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rest %= ...

val minutes = rest / MICROS_PER_MINUTE;
if (minutes < 10) {
sb.append(0)
}
sb.append(minutes)
sb.append(':')
rest %= MICROS_PER_MINUTE
val bd = BigDecimal.valueOf(rest, 6)
if (bd.compareTo(new BigDecimal(10)) < 0) {
sb.append(0)
}
val s = bd.stripTrailingZeros().toPlainString
sb.append(s)
sb.toString()
} else {
""
}

val intervalList = Seq(yearMonthPart, dayPart, timePart).filter(_.nonEmpty)
if (intervalList.nonEmpty) intervalList.mkString(" ") else "0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, a single 0 is also SQL standard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postgres=# set IntervalStyle=sql_standard;
SET
postgres=# select interval '0';
 interval
----------
 0
(1 row)

postgres=# set IntervalStyle=postgres;
SET
postgres=# select interval '0';
 interval
----------
 00:00:00
(1 row)

}

def toIso8601String(interval: CalendarInterval): String = {
val sb = new StringBuilder("P")

val year = interval.months / 12
if (year != 0) sb.append(year + "Y")
val month = interval.months % 12
if (month != 0) sb.append(month + "M")

if (interval.days != 0) sb.append(interval.days + "D")

if (interval.microseconds != 0) {
sb.append('T')
var rest = interval.microseconds
val hour = rest / MICROS_PER_HOUR
if (hour != 0) sb.append(hour + "H")
rest %= MICROS_PER_HOUR
val minute = rest / MICROS_PER_MINUTE
if (minute != 0) sb.append(minute + "M")
rest %= MICROS_PER_MINUTE
if (rest != 0) {
val bd = BigDecimal.valueOf(rest, 6)
sb.append(bd.stripTrailingZeros().toPlainString + "S")
}
} else if (interval.days == 0 && interval.months == 0) {
sb.append("T0S")
}
sb.toString()
}

private object ParseState extends Enumeration {
type ParseState = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -1774,6 +1773,21 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

object IntervalStyle extends Enumeration {
type IntervalStyle = Value
val SQL_STANDARD, ISO_8601, MULTI_UNITS = Value
}

val INTERVAL_STYLE = buildConf("spark.sql.intervalOutputStyle")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: we might need to move this config into spark.sql.dialect.xxx along #26444

Copy link
Member Author

@yaooqinn yaooqinn Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, but the behavior of this config is beyond the meaning of one dialect.

.doc("Display format for interval values. The value SQL_STANDARD will produce output" +
Copy link
Contributor

@cloud-fan cloud-fan Nov 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

display is not accurate as this controls the cast behavior as well. How about When converting interval values to strings (e.g. for display), this config decides the interval string format. The value SQL_STANDARD ...

Copy link
Member Author

@yaooqinn yaooqinn Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will make it much clearer

" matching SQL standard interval literals. The value ISO_8601 will produce output matching" +
" the ISO 8601 standard. The value MULTI_UNITS (which is the default) will produce output" +
" in form of value unit pairs, i.e. '3 year 2 months 10 days'")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(IntervalStyle.values.map(_.toString))
.createWithDefault(IntervalStyle.MULTI_UNITS.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think ansiEnabled is enough for this feature. Any concern?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I guess some users may already rely on the output string


val SORT_BEFORE_REPARTITION =
buildConf("spark.sql.execution.sortBeforeRepartition")
.internal()
Expand Down Expand Up @@ -2502,6 +2516,8 @@ class SQLConf extends Serializable with Logging {
def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY))

def intervalOutputStyle: IntervalStyle.Value = IntervalStyle.withName(getConf(INTERVAL_STYLE))

def ansiEnabled: Boolean = getConf(ANSI_ENABLED)

def usePostgreSQLDialect: Boolean = getConf(DIALECT) == Dialect.POSTGRESQL.toString()
Expand Down
Loading