Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,8 @@ object Sequence {
}
}

// To generate time sequences, we use scale 1 in TemporalSequenceImpl
// for `TimestampType`, while MICROS_PER_DAY for `DateType`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if start/end is date, can the step by seconds/minutes/hours?

Copy link
Contributor Author

@TJX2014 TJX2014 Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems we can, the result as follows:
`scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 hour))").count
res19: Long = 1465

scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 minute))").count
res20: Long = 87841

scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 second))").count
res21: Long = 5270401
scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 minute))").head(3)
res25: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])

scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 second))").head(3)

res26: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])

scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 minute))").head(3)
res27: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])

scala> sql("select explode(sequence(cast('2011-03-01' as date), cast('2011-05-01' as date), interval 1 hour))").head(3)
res28: Array[org.apache.spark.sql.Row] = Array([2011-03-01], [2011-03-01], [2011-03-01])
`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does pgsql support it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems pgsql can only support int as follows:
postgres= create sequence seq_test;
CREATE SEQUENCE
postgres= select nextval('seq_test');
1
(1 行记录)
postgres= select nextval('seq_test');
2
(1 行记录)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this function is from presto: https://prestodb.io/docs/current/functions/array.html

Can you check the behavior of presto? It looks confusing to use time fields as the step for date start/stop.

Copy link
Contributor Author

@TJX2014 TJX2014 Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base presto-server-0.236
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' hour);
Query 20200624_122744_00002_pehix failed: sequence step must be a day interval if start and end values are dates
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' day);
_col0
[2011-03-01, 2011-03-02]
(1 row)
Query 20200624_122757_00003_pehix, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' month);
_col0
[2011-03-01]
(1 row)

Query 20200624_122806_00004_pehix, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
presto> select sequence(date('2011-03-01'),date('2011-03-02'),interval '1' year);
_col0
[2011-03-01]
(1 row)
Query 20200624_122810_00005_pehix, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

Copy link
Contributor Author

@TJX2014 TJX2014 Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Done, It seems can be sequence day,month,year when start and end are DateType in presto.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the presto behavior makes sense. Can we send a PR to follow it first? e.g. throw an exception if the step is time fields while start/end is date. This can also simplify the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will do it tomorrow.

private class TemporalSequenceImpl[T: ClassTag]
(dt: IntegralType, scale: Long, fromLong: Long => T, zoneId: ZoneId)
(implicit num: Integral[T]) extends SequenceImpl {
Expand Down Expand Up @@ -2623,8 +2625,16 @@ object Sequence {
// about a month length in days and a day length in microseconds
val intervalStepInMicros =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the step should not be physical seconds, but logical interval. cc @MaxGekk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation is strange mix, it seems. There are following options:

  1. Step is an interval of (months, days, micros):

    1. If start point is TimestampType, we should convert it to local date-time in the session time zone, and add the interval by time components. The intermediate local timestamps should be converted back to micros using the session time zone but we should keep adding the interval to local timestamp "accumulator".
    2. The same for dates - convert start to a local date. Time zone shouldn't involved here.
  2. If the step is a duration in micros or days (this is not our case)

    1. start is TimestampType, we shouldn't convert it to local timestamp, and just add micros to instants. So, time zone will be not involved here.
    2. start is DateType, just add number of days. The same as for timestamps, time zone is not involved here.

Copy link
Contributor Author

@TJX2014 TJX2014 Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @cloud-fan,as @MaxGekk explain here, I am not sure if this patch looks ok,I am willing to add more documents to TemporalSequenceImplbut I am not sure if we can follow this way or refactor a little.

stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay
val startMicros: Long = num.toLong(start) * scale
val stopMicros: Long = num.toLong(stop) * scale

// Date to timestamp is not equal from GMT and Chicago timezones
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do these codes depend on few specific timezones? Also, other comments look valid here. We should take the timezone into account for timestamps too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the date if different from west to east, when it is date, we might need to consider to zone info to convert to time stamp, if it is already a time stamp, not a date here, we may ignore the zone because the time stamp is already consider it.

val (startMicros, stopMicros) = if (scale == 1) {
(num.toLong(start), num.toLong(stop))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this is correct, see my comment https://github.com/apache/spark/pull/28856/files#r442366706 but it is at least backward compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could separate this into different methods ?

}
else {
(daysToMicros(num.toInt(start), zoneId),
daysToMicros(num.toInt(stop), zoneId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit more? It's hard to understand this change without any comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh, please, explain why if scale != 1, start and stop contain days.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when scale != 1,it is converted to day count,so we may need to use zone info to translate into microseconds to get a correct result,rather than just multiply MICROS_PER_DAY which ignore timezone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when scale != 1,it is converted to day count

How can we tell it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add more documents to TemporalSequenceImpl first, to understand what it is doing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, maybe we can pass the scale through constructor:
private class TemporalSequenceImpl[T: ClassTag]
(dt: IntegralType, scale: Long, fromLong: Long => T, zoneId: ZoneId)

}

val maxEstimatedArrayLength =
getSequenceLength(startMicros, stopMicros, intervalStepInMicros)

Expand All @@ -2635,7 +2645,11 @@ object Sequence {
var i = 0

while (t < exclusiveItem ^ stepSign < 0) {
arr(i) = fromLong(t / scale)
arr(i) = if (scale == 1) {
fromLong(t)
} else {
fromLong(Math.round(t / scale.toFloat))
}
i += 1
t = timestampAddInterval(
startMicros, i * stepMonths, i * stepDays, i * stepMicros, zoneId)
Expand Down Expand Up @@ -2685,8 +2699,19 @@ object Sequence {
|} else if ($stepMonths == 0 && $stepDays == 0 && ${scale}L == 1) {
| ${backedSequenceImpl.genCode(ctx, start, stop, stepMicros, arr, elemType)};
|} else {
| final long $startMicros = $start * ${scale}L;
| final long $stopMicros = $stop * ${scale}L;
| long $startMicros;
| long $stopMicros;
| if (${scale}L == 1L) {
| $startMicros = $start;
| $stopMicros = $stop;
| } else {
| $startMicros =
| org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros(
| (int)$start, $zid);
| $stopMicros =
| org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros(
| (int)$stop, $zid);
| }
|
| $sequenceLengthCode
|
Expand All @@ -2698,7 +2723,11 @@ object Sequence {
| int $i = 0;
|
| while ($t < $exclusiveItem ^ $stepSign < 0) {
| $arr[$i] = ($elemType) ($t / ${scale}L);
| if (${scale}L == 1L) {
| $arr[$i] = ($elemType) $t;
| } else {
| $arr[$i] = ($elemType) (Math.round($t / (float)${scale}L));
| }
| $i += 1;
| $t = org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampAddInterval(
| $startMicros, $i * $stepMonths, $i * $stepDays, $i * $stepMicros, $zid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1854,4 +1854,17 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
Literal(stringToInterval("interval 1 year"))),
Seq(Date.valueOf("2018-01-01")))
}

test("SPARK-31982: sequence doesn't handle date increments that cross DST") {
Array("America/Chicago", "GMT", "Asia/Shanghai").foreach(tz => {
DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.getZoneId(tz)) {
checkEvaluation(Sequence(
Cast(Literal("2011-03-01"), DateType),
Cast(Literal("2011-04-01"), DateType),
Option(Literal(stringToInterval("interval 1 month"))),
Option(tz)),
Seq(Date.valueOf("2011-03-01"), Date.valueOf("2011-04-01")))
}
})
}
}