Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ def __hash__(self):
"pyspark.sql.tests.test_session",
"pyspark.sql.tests.test_subquery",
"pyspark.sql.tests.test_types",
"pyspark.sql.tests.test_geographytype",
"pyspark.sql.tests.test_geometrytype",
"pyspark.sql.tests.test_udf",
"pyspark.sql.tests.test_udf_combinations",
"pyspark.sql.tests.test_udf_profiler",
Expand Down
2 changes: 2 additions & 0 deletions python/docs/source/reference/pyspark.sql/data_types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Data Types
DecimalType
DoubleType
FloatType
GeographyType
GeometryType
IntegerType
LongType
MapType
Expand Down
18 changes: 18 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,24 @@
"Cannot serialize the function `<name>`. If you accessed the Spark session, or a DataFrame defined outside of the function, or any object that contains a Spark session, please be aware that they are not allowed in Spark Connect. For `foreachBatch`, please access the Spark session using `df.sparkSession`, where `df` is the first parameter in your `foreachBatch` function. For `StreamingQueryListener`, please access the Spark session using `self.spark`. For details please check out the PySpark doc for `foreachBatch` and `StreamingQueryListener`."
]
},
"ST_INVALID_ALGORITHM_VALUE" : {
"message" : [
"Invalid or unsupported edge interpolation algorithm value: '<alg>'."
],
"sqlState" : "22023"
},
"ST_INVALID_CRS_VALUE" : {
"message" : [
"Invalid or unsupported CRS (coordinate reference system) value: '<crs>'."
],
"sqlState" : "22023"
},
"ST_INVALID_SRID_VALUE" : {
"message" : [
"Invalid or unsupported SRID (spatial reference identifier) value: <srid>."
],
"sqlState" : "22023"
},
"TEST_CLASS_NOT_COMPILED": {
"message": [
"<test_class_path> doesn't exist. Spark sql test classes are not compiled."
Expand Down
96 changes: 96 additions & 0 deletions python/pyspark/sql/tests/test_geographytype.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- encoding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.sql.types import GeographyType
from pyspark.sql.utils import IllegalArgumentException
from pyspark.testing.sqlutils import ReusedSQLTestCase


class GeographyTypeTestMixin:
# Test cases for GeographyType construction based on SRID.

def test_geographytype_specified_valid_srid(self):
"""Test that GeographyType is constructed correctly when a valid SRID is specified."""

supported_srid = {4326: "OGC:CRS84"}

for srid, crs in supported_srid.items():
geography_type = GeographyType(srid)
self.assertEqual(geography_type.srid, srid)
self.assertEqual(geography_type.typeName(), "geography")
self.assertEqual(geography_type.simpleString(), f"geography({srid})")
self.assertEqual(geography_type.jsonValue(), f"geography({crs}, SPHERICAL)")
self.assertEqual(repr(geography_type), f"GeographyType({srid})")

def test_geographytype_specified_invalid_srid(self):
"""Test that the correct error is returned when an invalid SRID value is specified."""

for srid in [-4612, -4326, -2, -1, 1, 2]:
with self.assertRaises(IllegalArgumentException) as error_context:
GeographyType(srid)
srid_header = "[ST_INVALID_SRID_VALUE] Invalid or unsupported SRID"
self.assertEqual(
str(error_context.exception),
f"{srid_header} (spatial reference identifier) value: {srid}.",
)

# Special string value "ANY" in place of SRID is used to denote a mixed GEOGRAPHY type.

def test_geographytype_any_specifier(self):
"""Test that GeographyType is constructed correctly with ANY specifier for mixed SRID."""

geography_type = GeographyType("ANY")
self.assertEqual(geography_type.srid, GeographyType.MIXED_SRID)
self.assertEqual(geography_type.typeName(), "geography")
self.assertEqual(geography_type.simpleString(), "geography(any)")
self.assertEqual(repr(geography_type), "GeographyType(ANY)")

# The tests below verify GEOGRAPHY type object equality based on SRID values.

def test_geographytype_same_srid_values(self):
"""Test that two GeographyTypes with specified SRIDs have the same SRID values."""

for srid in [4326]:
geography_type_1 = GeographyType(srid)
geography_type_2 = GeographyType(srid)
self.assertEqual(geography_type_1.srid, geography_type_2.srid)

def test_geographytype_different_srid_values(self):
"""Test that two GeographyTypes with specified SRIDs have different SRID values."""

for srid in [4326]:
geography_type_1 = GeographyType(srid)
geography_type_2 = GeographyType("ANY")
self.assertNotEqual(geography_type_1.srid, geography_type_2.srid)


class GeographyTypeTest(GeographyTypeTestMixin, ReusedSQLTestCase):
pass


if __name__ == "__main__":
import unittest
from pyspark.sql.tests.test_geographytype import * # noqa: F401

try:
import xmlrunner

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
96 changes: 96 additions & 0 deletions python/pyspark/sql/tests/test_geometrytype.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- encoding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.sql.types import GeometryType
from pyspark.sql.utils import IllegalArgumentException
from pyspark.testing.sqlutils import ReusedSQLTestCase


class GeometryTypeTestMixin:
# Test cases for GeometryType construction based on SRID.

def test_geometrytype_specified_valid_srid(self):
"""Test that GeometryType is constructed correctly when a valid SRID is specified."""

supported_srid = {4326: "OGC:CRS84"}

for srid, crs in supported_srid.items():
geometry_type = GeometryType(srid)
self.assertEqual(geometry_type.srid, srid)
self.assertEqual(geometry_type.typeName(), "geometry")
self.assertEqual(geometry_type.simpleString(), f"geometry({srid})")
self.assertEqual(geometry_type.jsonValue(), f"geometry({crs})")
self.assertEqual(repr(geometry_type), f"GeometryType({srid})")

def test_geometrytype_specified_invalid_srid(self):
"""Test that the correct error is returned when an invalid SRID value is specified."""

for srid in [-4612, -4326, -2, -1, 1, 2]:
with self.assertRaises(IllegalArgumentException) as error_context:
GeometryType(srid)
srid_header = "[ST_INVALID_SRID_VALUE] Invalid or unsupported SRID"
self.assertEqual(
str(error_context.exception),
f"{srid_header} (spatial reference identifier) value: {srid}.",
)

# Special string value "ANY" in place of SRID is used to denote a mixed GEOMETRY type.

def test_geometrytype_any_specifier(self):
"""Test that GeometryType is constructed correctly with ANY specifier for mixed SRID."""

geometry_type = GeometryType("ANY")
self.assertEqual(geometry_type.srid, GeometryType.MIXED_SRID)
self.assertEqual(geometry_type.typeName(), "geometry")
self.assertEqual(geometry_type.simpleString(), "geometry(any)")
self.assertEqual(repr(geometry_type), "GeometryType(ANY)")

# The tests below verify GEOMETRY type object equality based on SRID values.

def test_geometrytype_same_srid_values(self):
"""Test that two GeometryTypes with specified SRIDs have the same SRID values."""

for srid in [4326]:
geometry_type_1 = GeometryType(srid)
geometry_type_2 = GeometryType(srid)
self.assertEqual(geometry_type_1.srid, geometry_type_2.srid)

def test_geometrytype_different_srid_values(self):
"""Test that two GeometryTypes with specified SRIDs have different SRID values."""

for srid in [4326]:
geometry_type_1 = GeometryType(srid)
geometry_type_2 = GeometryType("ANY")
self.assertNotEqual(geometry_type_1.srid, geometry_type_2.srid)


class GeometryTypeTest(GeometryTypeTestMixin, ReusedSQLTestCase):
pass


if __name__ == "__main__":
import unittest
from pyspark.sql.tests.test_geometrytype import * # noqa: F401

try:
import xmlrunner

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
98 changes: 98 additions & 0 deletions python/pyspark/sql/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
DecimalType,
BinaryType,
BooleanType,
GeographyType,
GeometryType,
NullType,
VariantType,
VariantVal,
Expand Down Expand Up @@ -921,6 +923,98 @@ def test_schema_with_bad_collations_provider(self):

self.assertRaises(PySparkValueError, lambda: _parse_datatype_json_string(schema_json))

def test_geography_json_serde(self):
from pyspark.sql.types import _parse_datatype_json_value, _parse_datatype_json_string

valid_test_cases = [
("geography", GeographyType(4326)),
("geography(OGC:CRS84)", GeographyType(4326)),
("geography(OGC:CRS84, SPHERICAL)", GeographyType(4326)),
("geography(SPHERICAL)", GeographyType(4326)),
("geography(SRID:ANY)", GeographyType("ANY")),
("geography(srid:any)", GeographyType("ANY")),
]
for json, expected in valid_test_cases:
python_datatype = _parse_datatype_json_value(json)
self.assertEqual(python_datatype, expected)
self.assertEqual(expected, _parse_datatype_json_string(expected.json()))

invalid_test_cases = [
"geography()",
"geography(())",
"geography(0)",
"geography(1)",
"geography(3857)",
"geography(4326)",
"geography(ANY)",
"geography(any)",
"geography(SRID)",
"geography(srid)",
"geography(CRS)",
"geography(crs)",
"geography(asdf)",
"geography(asdf:fdsa)",
"geography(123:123)",
"geography(srid:srid)",
"geography(SRID:0)",
"geography(SRID:1)",
"geography(SRID:123)",
"geography(EPSG:123)",
"geography(ESRI:123)",
"geography(OCG:123)",
"geography(OCG:CRS123)",
"geography(SRID:0,)",
"geography(SRID0)",
"geography(SRID:4326, ALG)",
]
for json in invalid_test_cases:
with self.assertRaises(Exception):
_parse_datatype_json_value(json)

def test_geometry_json_serde(self):
from pyspark.sql.types import _parse_datatype_json_value, _parse_datatype_json_string

valid_test_cases = [
("geometry", GeometryType(4326)),
("geometry(OGC:CRS84)", GeometryType(4326)),
("geometry(SRID:ANY)", GeometryType("ANY")),
("geometry(srid:any)", GeometryType("ANY")),
]
for json, expected in valid_test_cases:
python_datatype = _parse_datatype_json_value(json)
self.assertEqual(python_datatype, expected)
self.assertEqual(expected, _parse_datatype_json_string(expected.json()))

invalid_test_cases = [
"geometry()",
"geometry(())",
"geometry(0)",
"geometry(1)",
"geometry(3857)",
"geometry(4326)",
"geometry(ANY)",
"geometry(any)",
"geometry(SRID)",
"geometry(srid)",
"geometry(CRS)",
"geometry(crs)",
"geometry(asdf)",
"geometry(asdf:fdsa)",
"geometry(123:123)",
"geometry(srid:srid)",
"geometry(SRID:1)",
"geometry(SRID:123)",
"geometry(EPSG:123)",
"geometry(ESRI:123)",
"geometry(OCG:123)",
"geometry(OCG:CRS123)",
"geometry(SRID:0,)",
"geometry(SRID0)",
]
for json in invalid_test_cases:
with self.assertRaises(Exception):
_parse_datatype_json_value(json)

def test_udt(self):
from pyspark.sql.types import _parse_datatype_json_string, _infer_type, _make_type_verifier

Expand Down Expand Up @@ -1268,6 +1362,10 @@ def test_parse_datatype_json_string(self):
TimestampType(),
TimestampNTZType(),
NullType(),
GeographyType(4326),
GeographyType("ANY"),
GeometryType(4326),
GeometryType("ANY"),
VariantType(),
YearMonthIntervalType(),
YearMonthIntervalType(YearMonthIntervalType.YEAR),
Expand Down
Loading