Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,41 @@ def assert_runs_only_one_job_stage_and_task(job_group_name, f):
# Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n)
assert_runs_only_one_job_stage_and_task("collect_limit", lambda: df.limit(1).collect())

@unittest.skipIf(sys.version_info < (3, 3), "Unittest < 3.3 doesn't support mocking")
def test_unbounded_frames(self):
from unittest.mock import patch
from pyspark.sql import functions as F
from pyspark.sql import window
import importlib

df = self.spark.range(0, 3)

def rows_frame_match():
return "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select(
F.count("*").over(window.Window.rowsBetween(-sys.maxsize, sys.maxsize))
).columns[0]

def range_frame_match():
return "RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" in df.select(
F.count("*").over(window.Window.rangeBetween(-sys.maxsize, sys.maxsize))
).columns[0]

with patch("sys.maxsize", 2 ** 31 - 1):
importlib.reload(window)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I don't like but it looks better than alternatives:

  • Converting thresholds to methods so current sys.maxsize is picked up.
  • Mocking thresholds as well.

self.assertTrue(rows_frame_match())
self.assertTrue(range_frame_match())

with patch("sys.maxsize", 2 ** 63 - 1):
importlib.reload(window)
self.assertTrue(rows_frame_match())
self.assertTrue(range_frame_match())

with patch("sys.maxsize", 2 ** 127 - 1):
importlib.reload(window)
self.assertTrue(rows_frame_match())
self.assertTrue(range_frame_match())

importlib.reload(window)

if __name__ == "__main__":
from pyspark.sql.tests import *
Expand Down
30 changes: 16 additions & 14 deletions python/pyspark/sql/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Window(object):

_JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808
_JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807
_PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG)
_FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG)

unboundedPreceding = _JAVA_MIN_LONG

Expand Down Expand Up @@ -98,9 +100,9 @@ def rowsBetween(start, end):
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
Expand All @@ -123,14 +125,14 @@ def rangeBetween(start, end):

:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to -9223372036854775808.
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
Expand Down Expand Up @@ -185,14 +187,14 @@ def rowsBetween(self, start, end):

:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to -9223372036854775808.
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rowsBetween(start, end))

Expand All @@ -211,14 +213,14 @@ def rangeBetween(self, start, end):

:param start: boundary start, inclusive.
The frame is unbounded if this is ``Window.unboundedPreceding``, or
any value less than or equal to -9223372036854775808.
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
The frame is unbounded if this is ``Window.unboundedFollowing``, or
any value greater than or equal to 9223372036854775807.
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
if start <= Window._JAVA_MIN_LONG:
if start <= Window._PRECEDING_THRESHOLD:
start = Window.unboundedPreceding
if end >= Window._JAVA_MAX_LONG:
if end >= Window._FOLLOWING_THRESHOLD:
end = Window.unboundedFollowing
return WindowSpec(self._jspec.rangeBetween(start, end))

Expand Down