Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions python/pyspark/sql/connect/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
DecimalType,
StringType,
DataType,
TimeType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(self, value: Any, dataType: DataType) -> None:
DecimalType,
StringType,
DateType,
TimeType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
Expand Down Expand Up @@ -298,6 +300,9 @@ def __init__(self, value: Any, dataType: DataType) -> None:
value = DateType().toInternal(value)
else:
value = DateType().toInternal(value.date())
elif isinstance(dataType, TimeType):
assert isinstance(value, datetime.time)
value = TimeType().toInternal(value)
elif isinstance(dataType, TimestampType):
assert isinstance(value, datetime.datetime)
value = TimestampType().toInternal(value)
Expand Down Expand Up @@ -352,6 +357,8 @@ def _infer_type(cls, value: Any) -> DataType:
return TimestampNTZType() if is_timestamp_ntz_preferred() else TimestampType()
elif isinstance(value, datetime.date):
return DateType()
elif isinstance(value, datetime.time):
return TimeType()
elif isinstance(value, datetime.timedelta):
return DayTimeIntervalType()
elif isinstance(value, np.generic):
Expand Down Expand Up @@ -416,6 +423,9 @@ def _to_value(
elif literal.HasField("date"):
assert dataType is None or isinstance(dataType, DataType)
return DateType().fromInternal(literal.date)
elif literal.HasField("time"):
assert dataType is None or isinstance(dataType, TimeType)
return TimeType().fromInternal(literal.time.nano)
elif literal.HasField("timestamp"):
assert dataType is None or isinstance(dataType, TimestampType)
return TimestampType().fromInternal(literal.timestamp)
Expand Down Expand Up @@ -468,6 +478,9 @@ def to_plan(self, session: "SparkConnectClient") -> "proto.Expression":
expr.literal.string = str(self._value)
elif isinstance(self._dataType, DateType):
expr.literal.date = int(self._value)
elif isinstance(self._dataType, TimeType):
expr.literal.time.precision = self._dataType.precision
expr.literal.time.nano = int(self._value)
elif isinstance(self._dataType, TimestampType):
expr.literal.timestamp = int(self._value)
elif isinstance(self._dataType, TimestampNTZType):
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/connect/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
IntegerType,
FloatType,
DateType,
TimeType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
Expand Down Expand Up @@ -151,6 +152,8 @@ def pyspark_types_to_proto_types(data_type: DataType) -> pb2.DataType:
ret.decimal.precision = data_type.precision
elif isinstance(data_type, DateType):
ret.date.CopyFrom(pb2.DataType.Date())
elif isinstance(data_type, TimeType):
ret.time.CopyFrom(pb2.DataType.Time())
elif isinstance(data_type, TimestampType):
ret.timestamp.CopyFrom(pb2.DataType.Timestamp())
elif isinstance(data_type, TimestampNTZType):
Expand Down Expand Up @@ -237,6 +240,8 @@ def proto_schema_to_pyspark_data_type(schema: pb2.DataType) -> DataType:
return VarcharType(schema.var_char.length)
elif schema.HasField("date"):
return DateType()
elif schema.HasField("time"):
return TimeType()
elif schema.HasField("timestamp"):
return TimestampType()
elif schema.HasField("timestamp_ntz"):
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,16 @@ def condition():

eventually(catch_assertions=True)(condition)()

def test_time_lit(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test seems redundant.
Tests in test_column and test_functions will be reused on connect mode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test file is mainly for directly comparing behavior between connect and classic

# SPARK-52779: Test TimeType lit
ndf = self.connect.range(1).select(CF.lit(datetime.time(12, 13, 14)))
df = self.spark.sql("SELECT TIME '12:13:14'")

self.assert_eq(
ndf.toPandas(),
df.toPandas(),
)


if __name__ == "__main__":
from pyspark.sql.tests.connect.test_connect_basic import * # noqa: F401
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
MapType,
NullType,
DateType,
TimeType,
TimestampType,
TimestampNTZType,
ByteType,
Expand Down Expand Up @@ -396,6 +397,7 @@ def test_literal_with_acceptable_type(self):
("sss", StringType()),
(datetime.date(2022, 12, 13), DateType()),
(datetime.datetime.now(), DateType()),
(datetime.time(1, 0, 0), TimeType()),
(datetime.datetime.now(), TimestampType()),
(datetime.datetime.now(), TimestampNTZType()),
(datetime.timedelta(1, 2, 3), DayTimeIntervalType()),
Expand Down Expand Up @@ -441,6 +443,7 @@ def test_literal_null(self):
DoubleType(),
DecimalType(),
DateType(),
TimeType(),
TimestampType(),
TimestampNTZType(),
DayTimeIntervalType(),
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/sql/tests/connect/test_connect_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ def test_float_nan_inf(self):
self.assertIsNotNone(inf_lit.to_plan(None))

def test_datetime_literal_types(self):
"""Test the different timestamp, date, and timedelta types."""
"""Test the different timestamp, date, time, and timedelta types."""
datetime_lit = lit(datetime.datetime.now())

p = datetime_lit.to_plan(None)
Expand All @@ -908,6 +908,10 @@ def test_datetime_literal_types(self):
# (24 * 3600 + 2) * 1000000 + 3
self.assertEqual(86402000003, time_delta.to_plan(None).literal.day_time_interval)

time_lit = lit(datetime.time(23, 59, 59, 999999))
self.assertIsNotNone(time_lit.to_plan(None))
self.assertEqual(time_lit.to_plan(None).literal.time.nano, 86399999999000)

def test_list_to_literal(self):
"""Test conversion of lists to literals"""
empty_list = []
Expand Down Expand Up @@ -1024,6 +1028,7 @@ def test_literal_to_any_conversion(self):
decimal.Decimal(1.234567),
"sss",
datetime.date(2022, 12, 13),
datetime.time(12, 13, 14),
datetime.datetime.now(),
datetime.timedelta(1, 2, 3),
[1, 2, 3, 4, 5, 6],
Expand Down
32 changes: 32 additions & 0 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"BinaryType",
"BooleanType",
"DateType",
"TimeType",
"TimestampType",
"TimestampNTZType",
"DecimalType",
Expand Down Expand Up @@ -384,6 +385,33 @@ def fromInternal(self, v: int) -> datetime.date:
return datetime.date.fromordinal(v + self.EPOCH_ORDINAL)


class TimeType(AtomicType):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR actually add a new datatype in pyspark, not just python client.
It should work with both python client and pyspark classic.
We'd better also add test for pyspark classic.

Copy link
Contributor

@zhengruifeng zhengruifeng Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I just notice that the class hierarchy is not consistent with the JVM side:

case class TimeType(precision: Int) extends AnyTimeType

We'd better make them the same, by also introducing the AnyTimeType in pyspark. If we need to touch other existing date time types, we can do it in a separate PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very professional with PySpark, this will take some time, I changed it to WIP by now.

"""Time (datetime.time) data type."""

def __init__(self, precision: int = 6):
self.precision = precision

def needConversion(self) -> bool:
return True

def toInternal(self, t: datetime.time) -> int:
if t is not None:
return (
t.hour * 3_600_000_000_000
+ t.minute * 60_000_000_000
+ t.second * 1_000_000_000
+ t.microsecond * 1_000
)

def fromInternal(self, nano: int) -> datetime.time:
if nano is not None:
hours, remainder = divmod(nano, 3_600_000_000_000)
minutes, remainder = divmod(remainder, 60_000_000_000)
seconds, remainder = divmod(remainder, 1_000_000_000)
microseconds = remainder // 1_000
return datetime.time(hours, minutes, seconds, microseconds)


class TimestampType(AtomicType, metaclass=DataTypeSingleton):
"""Timestamp (datetime.datetime) data type."""

Expand Down Expand Up @@ -1846,6 +1874,7 @@ def parseJson(cls, json_str: str) -> "VariantVal":
_FIXED_DECIMAL = re.compile(r"decimal\(\s*(\d+)\s*,\s*(-?\d+)\s*\)")
_INTERVAL_DAYTIME = re.compile(r"interval (day|hour|minute|second)( to (day|hour|minute|second))?")
_INTERVAL_YEARMONTH = re.compile(r"interval (year|month)( to (year|month))?")
_TIME = re.compile(r"time\(\s*(\d+)\s*\)")

_COLLATIONS_METADATA_KEY = "__COLLATIONS"

Expand Down Expand Up @@ -1987,6 +2016,9 @@ def _parse_datatype_json_value(
elif _FIXED_DECIMAL.match(json_value):
m = _FIXED_DECIMAL.match(json_value)
return DecimalType(int(m.group(1)), int(m.group(2))) # type: ignore[union-attr]
elif _TIME.match(json_value):
m = _TIME.match(json_value)
return TimeType(int(m.group(1))) # type: ignore[union-attr]
elif _INTERVAL_DAYTIME.match(json_value):
m = _INTERVAL_DAYTIME.match(json_value)
inverted_fields = DayTimeIntervalType._inverted_fields
Expand Down