Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Datetime Functions
window
session_window
timestamp_seconds
window_time


Collection Functions
Expand Down
46 changes: 46 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4884,6 +4884,52 @@ def check_string_field(field, fieldName): # type: ignore[no-untyped-def]
return _invoke_function("window", time_col, windowDuration)


def window_time(
windowColumn: "ColumnOrName",
) -> Column:
"""Computes the event time from a window column. The column window values are produced
by window aggregating operators and are of type `STRUCT<start: TIMESTAMP, end: TIMESTAMP>`
where start is inclusive and end is exclusive. The event time of records produced by window
aggregating operators can be computed as ``window_time(window)`` and are
``window.end - lit(1).alias("microsecond")`` (as microsecond is the minimal supported event
time precision). The window column must be one produced by a window aggregating operator.

.. versionadded:: 3.4.0

Parameters
----------
windowColumn : :class:`~pyspark.sql.Column`
The window column of a window aggregate records.

Returns
-------
:class:`~pyspark.sql.Column`
the column for computed results.

Examples
--------
>>> import datetime
>>> df = spark.createDataFrame(
... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
... ).toDF("date", "val")

Group the data into 5 second time windows and aggregate as sum.

>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))

Extract the window event time using the window_time function.

>>> w.select(
... w.window.end.cast("string").alias("end"),
... window_time(w.window).cast("string").alias("window_time"),
... "sum"
... ).collect()
[Row(end='2016-03-11 09:00:10', window_time='2016-03-11 09:00:09.999999', sum=1)]
"""
window_col = _to_java_column(windowColumn)
return _invoke_function("window_time", window_col)


def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) -> Column:
"""
Generates session window given a timestamp specifying column.
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,22 @@ def test_window_functions_cumulative_sum(self):
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[: len(r)])

def test_window_time(self):
df = self.spark.createDataFrame(
[(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], ["date", "val"]
)
from pyspark.sql import functions as F

w = df.groupBy(F.window("date", "5 seconds")).agg(F.sum("val").alias("sum"))
r = w.select(
w.window.end.cast("string").alias("end"),
F.window_time(w.window).cast("string").alias("window_time"),
"sum",
).collect()
self.assertEqual(
r[0], Row(end="2016-03-11 09:00:10", window_time="2016-03-11 09:00:09.999999", sum=1)
)

def test_collect_functions(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
from pyspark.sql import functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.DAY
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{Utils => CUtils}

Expand Down Expand Up @@ -313,6 +312,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveAggregateFunctions ::
TimeWindowing ::
SessionWindowing ::
ResolveWindowTime ::
ResolveDefaultColumns(v1SessionCatalog) ::
ResolveInlineTables ::
ResolveLambdaVariables ::
Expand Down Expand Up @@ -3965,242 +3965,6 @@ object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
}
}

/**
* Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to
* figure out how many windows a time column can map to, we over-estimate the number of windows and
* filter out the rows where the time column is not inside the time window.
*/
object TimeWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val WINDOW_COL_NAME = "window"
private final val WINDOW_START = "start"
private final val WINDOW_END = "end"

/**
* Generates the logical plan for generating window ranges on a timestamp column. Without
* knowing what the timestamp value is, it's non-trivial to figure out deterministically how many
* window ranges a timestamp will map to given all possible combinations of a window duration,
* slide duration and start time (offset). Therefore, we express and over-estimate the number of
* windows there may be, and filter the valid windows. We use last Project operator to group
* the window columns into a struct so they can be accessed as `window.start` and `window.end`.
*
* The windows are calculated as below:
* maxNumOverlapping <- ceil(windowDuration / slideDuration)
* for (i <- 0 until maxNumOverlapping)
* lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration
* windowStart <- lastStart - i * slideDuration
* windowEnd <- windowStart + windowDuration
* return windowStart, windowEnd
*
* This behaves as follows for the given parameters for the time: 12:05. The valid windows are
* marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the
* Filter operator.
* window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
* 11:55 - 12:07 + 11:52 - 12:04 x
* 12:00 - 12:12 + 11:57 - 12:09 +
* 12:05 - 12:17 + 12:02 - 12:14 +
*
* @param plan The logical plan
* @return the logical plan that will generate the time windows using the Expand operator, with
* the Filter operator for correctness and Project for usability.
*/
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(TIME_WINDOW), ruleId) {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val windowExpressions =
p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet

val numWindowExpr = p.expressions.flatMap(_.collect {
case s: SessionWindow => s
case t: TimeWindow => t
}).toSet.size

// Only support a single window expression for now
if (numWindowExpr == 1 && windowExpressions.nonEmpty &&
windowExpressions.head.timeColumn.resolved &&
windowExpressions.head.checkInputDataTypes().isSuccess) {

val window = windowExpressions.head

val metadata = window.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

def getWindow(i: Int, dataType: DataType): Expression = {
val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
val lastStart = timestamp - (timestamp - window.startTime
+ window.slideDuration) % window.slideDuration
val windowStart = lastStart - i * window.slideDuration
val windowEnd = windowStart + window.windowDuration

// We make sure value fields are nullable since the dataType of TimeWindow defines them
// as nullable.
CreateNamedStruct(
Literal(WINDOW_START) ::
PreciseTimestampConversion(windowStart, LongType, dataType).castNullable() ::
Literal(WINDOW_END) ::
PreciseTimestampConversion(windowEnd, LongType, dataType).castNullable() ::
Nil)
}

val windowAttr = AttributeReference(
WINDOW_COL_NAME, window.dataType, metadata = metadata)()

if (window.windowDuration == window.slideDuration) {
val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)(
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))

val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

// For backwards compatibility we add a filter to filter out nulls
val filterExpr = IsNotNull(window.timeColumn)

replacedPlan.withNewChildren(
Project(windowStruct +: child.output,
Filter(filterExpr, child)) :: Nil)
} else {
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows =
Seq.tabulate(overlappingWindows)(i =>
getWindow(i, window.timeColumn.dataType))

val projections = windows.map(_ +: child.output)

// When the condition windowDuration % slideDuration = 0 is fulfilled,
// the estimation of the number of windows becomes exact one,
// which means all produced windows are valid.
val filterExpr =
if (window.windowDuration % window.slideDuration == 0) {
IsNotNull(window.timeColumn)
} else {
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
}

val substitutedPlan = Filter(filterExpr,
Expand(projections, windowAttr +: child.output, child))

val renamedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

renamedPlan.withNewChildren(substitutedPlan :: Nil)
}
} else if (numWindowExpr > 1) {
throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
} else {
p // Return unchanged. Analyzer will throw exception later
}
}
}

/** Maps a time column to a session window. */
object SessionWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val SESSION_COL_NAME = "session_window"
private final val SESSION_START = "start"
private final val SESSION_END = "end"

/**
* Generates the logical plan for generating session window on a timestamp column.
* Each session window is initially defined as [timestamp, timestamp + gap).
*
* This also adds a marker to the session column so that downstream can easily find the column
* on session window.
*/
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val sessionExpressions =
p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet

val numWindowExpr = p.expressions.flatMap(_.collect {
case s: SessionWindow => s
case t: TimeWindow => t
}).toSet.size

// Only support a single session expression for now
if (numWindowExpr == 1 && sessionExpressions.nonEmpty &&
sessionExpressions.head.timeColumn.resolved &&
sessionExpressions.head.checkInputDataTypes().isSuccess) {

val session = sessionExpressions.head

val metadata = session.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putBoolean(SessionWindow.marker, true)
.build()

val sessionAttr = AttributeReference(
SESSION_COL_NAME, session.dataType, metadata = newMetadata)()

val sessionStart =
PreciseTimestampConversion(session.timeColumn, session.timeColumn.dataType, LongType)
val gapDuration = session.gapDuration match {
case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
Cast(expr, CalendarIntervalType)
case other =>
throw QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
}
val sessionEnd = PreciseTimestampConversion(session.timeColumn + gapDuration,
session.timeColumn.dataType, LongType)

// We make sure value fields are nullable since the dataType of SessionWindow defines them
// as nullable.
val literalSessionStruct = CreateNamedStruct(
Literal(SESSION_START) ::
PreciseTimestampConversion(sessionStart, LongType, session.timeColumn.dataType)
.castNullable() ::
Literal(SESSION_END) ::
PreciseTimestampConversion(sessionEnd, LongType, session.timeColumn.dataType)
.castNullable() ::
Nil)

val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))

val replacedPlan = p transformExpressions {
case s: SessionWindow => sessionAttr
}

val filterByTimeRange = session.gapDuration match {
case Literal(interval: CalendarInterval, CalendarIntervalType) =>
interval == null || interval.months + interval.days + interval.microseconds <= 0
case _ => true
}

// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap duration.
val filterExpr = if (filterByTimeRange) {
IsNotNull(session.timeColumn) &&
(sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
} else {
IsNotNull(session.timeColumn)
}

replacedPlan.withNewChildren(
Filter(filterExpr,
Project(sessionStruct +: child.output, child)) :: Nil)
} else if (numWindowExpr > 1) {
throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
} else {
p // Return unchanged. Analyzer will throw exception later
}
}
}

/**
* Resolve expressions if they contains [[NamePlaceholder]]s.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ object FunctionRegistry {
expression[Year]("year"),
expression[TimeWindow]("window"),
expression[SessionWindow]("session_window"),
expression[WindowTime]("window_time"),
expression[MakeDate]("make_date"),
expression[MakeTimestamp]("make_timestamp"),
// We keep the 2 expression builders below to have different function docs.
Expand Down
Loading