diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/GeographyVal.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/GeographyVal.java new file mode 100644 index 000000000000..48dc6f896e91 --- /dev/null +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/GeographyVal.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.types; + +import java.io.Serializable; + +// This class represents the physical type for the GEOGRAPHY data type. +public final class GeographyVal implements Comparable, Serializable { + + // The GEOGRAPHY type is implemented as a byte array. We provide `getBytes` and `fromBytes` + // methods for readers and writers to access this underlying array of bytes. + private final byte[] value; + + // We make the constructor private. We should use `fromBytes` to create new instances. + private GeographyVal(byte[] value) { + this.value = value; + } + + public byte[] getBytes() { + return value; + } + + public static GeographyVal fromBytes(byte[] bytes) { + if (bytes == null) { + return null; + } else { + return new GeographyVal(bytes); + } + } + + // Comparison is not yet supported for GEOGRAPHY. + public int compareTo(GeographyVal g) { + throw new UnsupportedOperationException(); + } +} diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/GeometryVal.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/GeometryVal.java new file mode 100644 index 000000000000..2bb7f194c940 --- /dev/null +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/GeometryVal.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.types; + +import java.io.Serializable; + +// This class represents the physical type for the GEOMETRY data type. +public final class GeometryVal implements Comparable, Serializable { + + // The GEOMETRY type is implemented as a byte array. We provide `getBytes` and `fromBytes` + // methods for readers and writers to access this underlying array of bytes. + private final byte[] value; + + // We make the constructor private. We should use `fromBytes` to create new instances. + private GeometryVal(byte[] value) { + this.value = value; + } + + public byte[] getBytes() { + return value; + } + + public static GeometryVal fromBytes(byte[] bytes) { + if (bytes == null) { + return null; + } else { + return new GeometryVal(bytes); + } + } + + // Comparison is not yet supported for GEOMETRY. + public int compareTo(GeometryVal g) { + throw new UnsupportedOperationException(); + } +} diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/GeographyValSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/GeographyValSuite.java new file mode 100644 index 000000000000..639a8b2f7782 --- /dev/null +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/GeographyValSuite.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.types; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class GeographyValSuite { + + @Test + public void roundTripBytes() { + // A simple byte array to test the round trip (`fromBytes` -> `getBytes`). + byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6 }; + GeographyVal geographyVal = GeographyVal.fromBytes(bytes); + assertNotNull(geographyVal); + assertArrayEquals(bytes, geographyVal.getBytes()); + } + + @Test + public void roundNullHandling() { + // A simple null byte array to test null handling for GEOGRAPHY. + byte[] bytes = null; + GeographyVal geographyVal = GeographyVal.fromBytes(bytes); + assertNull(geographyVal); + } + + @Test + public void testCompareTo() { + // Comparison is not yet supported for GEOGRAPHY. + byte[] bytes1 = new byte[] { 1, 2, 3 }; + byte[] bytes2 = new byte[] { 4, 5, 6 }; + GeographyVal geographyVal1 = GeographyVal.fromBytes(bytes1); + GeographyVal geographyVal2 = GeographyVal.fromBytes(bytes2); + try { + geographyVal1.compareTo(geographyVal2); + } catch (UnsupportedOperationException e) { + assert(e.toString().equals("java.lang.UnsupportedOperationException")); + } + } +} diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/GeometryValSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/GeometryValSuite.java new file mode 100644 index 000000000000..e38c6903e6dd --- /dev/null +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/GeometryValSuite.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.types; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class GeometryValSuite { + + @Test + public void roundTripBytes() { + // A simple byte array to test the round trip (`fromBytes` -> `getBytes`). + byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6 }; + GeometryVal geometryVal = GeometryVal.fromBytes(bytes); + assertNotNull(geometryVal); + assertArrayEquals(bytes, geometryVal.getBytes()); + } + + @Test + public void roundNullHandling() { + // A simple null byte array to test null handling for GEOMETRY. + byte[] bytes = null; + GeometryVal geometryVal = GeometryVal.fromBytes(bytes); + assertNull(geometryVal); + } + + @Test + public void testCompareTo() { + // Comparison is not yet supported for GEOMETRY. + byte[] bytes1 = new byte[] { 1, 2, 3 }; + byte[] bytes2 = new byte[] { 4, 5, 6 }; + GeometryVal geometryVal1 = GeometryVal.fromBytes(bytes1); + GeometryVal geometryVal2 = GeometryVal.fromBytes(bytes2); + try { + geometryVal1.compareTo(geometryVal2); + } catch (UnsupportedOperationException e) { + assert(e.toString().equals("java.lang.UnsupportedOperationException")); + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala index 1084e9973151..466fa2c0c8c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/PhysicalDataType.scala @@ -23,8 +23,8 @@ import scala.reflect.runtime.universe.typeTag import org.apache.spark.sql.catalyst.expressions.{Ascending, BoundReference, InterpretedOrdering, SortOrder} import org.apache.spark.sql.catalyst.util.{ArrayData, CollationFactory, MapData, SQLOrderingUtil} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType} -import org.apache.spark.unsafe.types.{ByteArray, UTF8String, VariantVal} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteExactNumeric, ByteType, CalendarIntervalType, CharType, DataType, DateType, DayTimeIntervalType, Decimal, DecimalExactNumeric, DecimalType, DoubleExactNumeric, DoubleType, FloatExactNumeric, FloatType, FractionalType, GeographyType, GeometryType, IntegerExactNumeric, IntegerType, IntegralType, LongExactNumeric, LongType, MapType, NullType, NumericType, ShortExactNumeric, ShortType, StringType, StructField, StructType, TimestampNTZType, TimestampType, TimeType, VarcharType, VariantType, YearMonthIntervalType} +import org.apache.spark.unsafe.types.{ByteArray, GeographyVal, GeometryVal, UTF8String, VariantVal} import org.apache.spark.util.ArrayImplicits._ sealed abstract class PhysicalDataType { @@ -59,6 +59,8 @@ object PhysicalDataType { case StructType(fields) => PhysicalStructType(fields) case MapType(keyType, valueType, valueContainsNull) => PhysicalMapType(keyType, valueType, valueContainsNull) + case _: GeometryType => PhysicalGeometryType + case _: GeographyType => PhysicalGeographyType case VariantType => PhysicalVariantType case _ => UninitializedPhysicalType } @@ -411,3 +413,19 @@ object UninitializedPhysicalType extends PhysicalDataType { override private[sql] type InternalType = Any @transient private[sql] lazy val tag = typeTag[InternalType] } + +case class PhysicalGeographyType() extends PhysicalDataType { + private[sql] type InternalType = GeographyVal + @transient private[sql] lazy val tag = typeTag[InternalType] + private[sql] val ordering = implicitly[Ordering[InternalType]] +} + +object PhysicalGeographyType extends PhysicalGeographyType + +case class PhysicalGeometryType() extends PhysicalDataType { + private[sql] type InternalType = GeometryVal + @transient private[sql] lazy val tag = typeTag[InternalType] + private[sql] val ordering = implicitly[Ordering[InternalType]] +} + +object PhysicalGeometryType extends PhysicalGeometryType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala index 51de95826f81..ca377e739a0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/types/GeographyTypeSuite.scala @@ -23,6 +23,7 @@ import org.json4s.JsonAST.JString import org.apache.spark.SparkFunSuite import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalGeographyType} class GeographyTypeSuite extends SparkFunSuite { @@ -216,4 +217,15 @@ class GeographyTypeSuite extends SparkFunSuite { } } } + + test("PhysicalDataType maps GeographyType to PhysicalGeographyType") { + val geometryTypes: Seq[DataType] = Seq( + GeographyType(4326), + GeographyType("ANY") + ) + geometryTypes.foreach { geometryType => + val pdt = PhysicalDataType(geometryType) + assert(pdt.isInstanceOf[PhysicalGeographyType]) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala index a6961f0c0343..9ae96eedef41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/types/GeometryTypeSuite.scala @@ -23,6 +23,7 @@ import org.json4s.JsonAST.JString import org.apache.spark.SparkFunSuite import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.catalyst.types.{PhysicalDataType, PhysicalGeometryType} class GeometryTypeSuite extends SparkFunSuite { @@ -200,4 +201,17 @@ class GeometryTypeSuite extends SparkFunSuite { } } } + + test("PhysicalDataType maps GeometryType to PhysicalGeometryType") { + val geometryTypes: Seq[DataType] = Seq( + GeometryType(0), + GeometryType(3857), + GeometryType(4326), + GeometryType("ANY") + ) + geometryTypes.foreach { geometryType => + val pdt = PhysicalDataType(geometryType) + assert(pdt.isInstanceOf[PhysicalGeometryType]) + } + } }