From abaa9dc6a7ff7f1f2c5617a910ea1b8930a44398 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Thu, 8 Jan 2026 19:17:05 -0800 Subject: [PATCH 1/2] Add geometry type support to Iceberg connector Implement reading and writing of geometry columns in Iceberg tables using the Iceberg v3 geometry type specification. --- .../geospatial/serde/JtsGeometrySerde.java | 110 ++++++++ .../serde/TestGeometrySerialization.java | 28 ++ .../geospatial/TestSphericalGeoFunctions.java | 7 +- plugin/trino-iceberg/pom.xml | 11 + .../plugin/iceberg/ExpressionConverter.java | 6 + .../trino/plugin/iceberg/GeoSpatialUtils.java | 49 ++++ .../iceberg/IcebergFileWriterFactory.java | 93 ++++++- .../trino/plugin/iceberg/IcebergMetadata.java | 5 +- .../trino/plugin/iceberg/IcebergPageSink.java | 147 ++++++++++ .../iceberg/IcebergPageSourceProvider.java | 199 ++++++++++++-- .../io/trino/plugin/iceberg/IcebergTypes.java | 13 + .../trino/plugin/iceberg/TypeConverter.java | 39 +++ .../plugin/iceberg/util/HiveSchemaUtil.java | 4 +- .../plugin/iceberg/util/OrcTypeConverter.java | 4 +- .../trino/plugin/iceberg/TestIcebergV3.java | 257 ++++++++++++++++++ .../plugin/iceberg/TestTypeConverter.java | 49 ++++ 16 files changed, 991 insertions(+), 30 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/GeoSpatialUtils.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTypeConverter.java diff --git a/lib/trino-geospatial-toolkit/src/main/java/io/trino/geospatial/serde/JtsGeometrySerde.java b/lib/trino-geospatial-toolkit/src/main/java/io/trino/geospatial/serde/JtsGeometrySerde.java index 23d2b7aea299..1b7c2720e51a 100644 --- a/lib/trino-geospatial-toolkit/src/main/java/io/trino/geospatial/serde/JtsGeometrySerde.java +++ b/lib/trino-geospatial-toolkit/src/main/java/io/trino/geospatial/serde/JtsGeometrySerde.java @@ -24,9 +24,11 @@ import org.locationtech.jts.io.WKBReader; import org.locationtech.jts.io.WKBWriter; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; /** @@ -36,6 +38,10 @@ public final class JtsGeometrySerde { private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); + public static final int OGC_CRS84_SRID = 4326; + + // EWKB flag for SRID presence (bit 29) + private static final int EWKB_SRID_FLAG = 0x20000000; // WKB type codes (2D) private static final int WKB_POINT = 1; @@ -184,4 +190,108 @@ public static Slice serializeBinaryOp(Geometry result, Geometry left, Geometry r result.setSRID(validateAndGetSrid(left, right)); return serialize(result); } + + /** + * Extract SRID from EWKB without full parsing. + * Returns 0 if no SRID is embedded. + */ + public static int extractSrid(Slice ewkb) + { + if (ewkb.length() < 9) { + return 0; + } + boolean bigEndian = ewkb.getByte(0) == 0; + int type = ewkb.getInt(1); + if (bigEndian) { + type = Integer.reverseBytes(type); + } + if ((type & EWKB_SRID_FLAG) == 0) { + return 0; + } + int srid = ewkb.getInt(5); + if (bigEndian) { + srid = Integer.reverseBytes(srid); + } + return srid; + } + + /** + * Strip SRID from EWKB to produce standard WKB. + * If the input is already standard WKB (no SRID flag), returns it unchanged. + */ + public static Slice ewkbToWkb(Slice ewkb) + { + if (ewkb.length() < 9) { + return ewkb; + } + boolean bigEndian = ewkb.getByte(0) == 0; + int type = ewkb.getInt(1); + if (bigEndian) { + type = Integer.reverseBytes(type); + } + if ((type & EWKB_SRID_FLAG) == 0) { + return ewkb; + } + + // Strip SRID flag and 4 SRID bytes + int newType = type & ~EWKB_SRID_FLAG; + Slice wkb = Slices.allocate(ewkb.length() - 4); + wkb.setByte(0, ewkb.getByte(0)); // endianness + wkb.setInt(1, bigEndian ? Integer.reverseBytes(newType) : newType); + wkb.setBytes(5, ewkb, 9, ewkb.length() - 9); // geometry data + return wkb; + } + + /** + * Convert a CRS string to an SRID integer. + * Supports formats: + * - "EPSG:XXXX" → XXXX + * - "OGC:CRS84" or "CRS84" → 4326 (WGS84) + */ + public static int crsToSrid(String crs) + { + if (crs == null || crs.isEmpty()) { + return 0; + } + String upperCrs = crs.toUpperCase(ENGLISH); + if (upperCrs.equals("OGC:CRS84") || upperCrs.equals("CRS84")) { + return OGC_CRS84_SRID; // WGS84 + } + if (upperCrs.startsWith("EPSG:")) { + try { + int srid = Integer.parseInt(crs.substring(5)); + if (srid <= 0) { + throw new IllegalArgumentException("Invalid EPSG code: " + crs); + } + return srid; + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid EPSG code: " + crs); + } + } + throw new IllegalArgumentException("Unsupported CRS format: " + crs); + } + + /** + * Inject SRID into WKB to produce EWKB. + */ + public static Slice wkbToEwkb(Slice wkb, int srid) + { + checkArgument(wkb.length() >= 5, "WKB too short"); + boolean bigEndian = wkb.getByte(0) == 0; + int type = wkb.getInt(1); + if (bigEndian) { + type = Integer.reverseBytes(type); + } + checkArgument((type & EWKB_SRID_FLAG) == 0, "Input already has SRID flag set (expected WKB, got EWKB)"); + + // Add SRID flag + int newType = type | EWKB_SRID_FLAG; + Slice ewkb = Slices.allocate(wkb.length() + 4); + ewkb.setByte(0, wkb.getByte(0)); // endianness + ewkb.setInt(1, bigEndian ? Integer.reverseBytes(newType) : newType); + ewkb.setInt(5, bigEndian ? Integer.reverseBytes(srid) : srid); + ewkb.setBytes(9, wkb, 5, wkb.length() - 5); // geometry data + return ewkb; + } } diff --git a/lib/trino-geospatial-toolkit/src/test/java/io/trino/geospatial/serde/TestGeometrySerialization.java b/lib/trino-geospatial-toolkit/src/test/java/io/trino/geospatial/serde/TestGeometrySerialization.java index ae2383c45d8c..9849d4dbaa67 100644 --- a/lib/trino-geospatial-toolkit/src/test/java/io/trino/geospatial/serde/TestGeometrySerialization.java +++ b/lib/trino-geospatial-toolkit/src/test/java/io/trino/geospatial/serde/TestGeometrySerialization.java @@ -30,10 +30,13 @@ import static io.trino.geospatial.GeometryType.MULTI_POLYGON; import static io.trino.geospatial.GeometryType.POINT; import static io.trino.geospatial.GeometryType.POLYGON; +import static io.trino.geospatial.serde.JtsGeometrySerde.crsToSrid; import static io.trino.geospatial.serde.JtsGeometrySerde.deserialize; import static io.trino.geospatial.serde.JtsGeometrySerde.deserializeEnvelope; import static io.trino.geospatial.serde.JtsGeometrySerde.deserializeType; +import static io.trino.geospatial.serde.JtsGeometrySerde.ewkbToWkb; import static io.trino.geospatial.serde.JtsGeometrySerde.serialize; +import static io.trino.geospatial.serde.JtsGeometrySerde.wkbToEwkb; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -145,6 +148,31 @@ public void testGeometryCollectionSridRoundTrip() testSerializationWithSrid("GEOMETRYCOLLECTION (POINT (1 2), LINESTRING (0 0, 1 2, 3 4))", 3857); } + @Test + public void testCrsToSridRejectsNonPositiveEpsgCodes() + { + assertThat(crsToSrid("EPSG:3857")).isEqualTo(3857); + assertThatThrownBy(() -> crsToSrid("EPSG:0")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid EPSG code: EPSG:0"); + assertThatThrownBy(() -> crsToSrid("EPSG:-3857")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid EPSG code: EPSG:-3857"); + } + + @Test + public void testWkbToEwkbRejectsEwkbInput() + { + Geometry geometry = createJtsGeometry("POINT (1 2)"); + geometry.setSRID(3857); + Slice ewkb = serialize(geometry); + + assertThat(ewkbToWkb(ewkb)).isNotEqualTo(ewkb); + assertThatThrownBy(() -> wkbToEwkb(ewkb, 3857)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Input already has SRID flag set (expected WKB, got EWKB)"); + } + @Test public void testEnvelope() { diff --git a/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSphericalGeoFunctions.java b/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSphericalGeoFunctions.java index cc8dcb33c197..edb390b248e1 100644 --- a/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSphericalGeoFunctions.java +++ b/plugin/trino-geospatial/src/test/java/io/trino/plugin/geospatial/TestSphericalGeoFunctions.java @@ -35,7 +35,6 @@ import static com.google.common.io.Resources.getResource; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.geospatial.serde.JtsGeometrySerde.serialize; -import static io.trino.plugin.geospatial.GeoTestUtils.spatiallyEquals; import static io.trino.plugin.geospatial.GeometryType.GEOMETRY; import static io.trino.plugin.geospatial.SphericalGeographyType.SPHERICAL_GEOGRAPHY; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -91,11 +90,7 @@ public void testGetObjectValue() } Block block = builder.build(); for (int i = 0; i < wktList.size(); i++) { - String expected = wktList.get(i); - String actual = (String) SPHERICAL_GEOGRAPHY.getObjectValue(block, i); - assertThat(spatiallyEquals(expected, actual)) - .withFailMessage("Geometry mismatch at index %d!\nExpected: %s\nActual: %s", i, expected, actual) - .isTrue(); + assertThat(wktList.get(i)).isEqualTo(SPHERICAL_GEOGRAPHY.getObjectValue(block, i)); } } diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 3e988eed4a1f..a34c4531f8db 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -128,6 +128,11 @@ trino-filesystem-s3 + + io.trino + trino-geospatial-toolkit + + io.trino trino-hive @@ -566,6 +571,12 @@ test + + io.trino + trino-geospatial + test + + io.trino trino-hdfs diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java index 1d6fe24d4855..e8a8909ebd9f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java @@ -45,6 +45,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static io.trino.plugin.hive.util.HiveUtil.isStructuralType; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeospatialType; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static io.trino.plugin.iceberg.util.Timestamps.compareTimestampNanosToRange; import static io.trino.plugin.iceberg.util.Timestamps.compareTimestampTzNanosToRange; @@ -98,6 +99,11 @@ public static boolean isConvertibleToIcebergExpression(Domain domain) return domain.isOnlyNull() || domain.getValues().isAll(); } + // Geometry and Geography types are not supported for predicate pushdown in Iceberg + if (isGeospatialType(domain.getType())) { + return false; + } + if (domain.getType() == UUID) { // Iceberg orders UUID values differently than Trino (perhaps due to https://bugs.openjdk.org/browse/JDK-7025832), so allow only IS NULL / IS NOT NULL checks return domain.isOnlyNull() || domain.getValues().isAll(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/GeoSpatialUtils.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/GeoSpatialUtils.java new file mode 100644 index 000000000000..885fe4c983b8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/GeoSpatialUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.spi.type.StandardTypes; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import io.trino.spi.type.TypeSignature; + +final class GeoSpatialUtils +{ + private GeoSpatialUtils() {} + + public static boolean isGeometryType(Type type) + { + return type.getBaseName().equals(StandardTypes.GEOMETRY); + } + + public static boolean isSphericalGeographyType(Type type) + { + return type.getBaseName().equals(StandardTypes.SPHERICAL_GEOGRAPHY); + } + + public static boolean isGeospatialType(Type type) + { + return isGeometryType(type) || isSphericalGeographyType(type); + } + + public static Type getGeometryType(TypeManager typeManager) + { + return typeManager.getType(new TypeSignature(StandardTypes.GEOMETRY)); + } + + public static Type getSphericalGeographyType(TypeManager typeManager) + { + return typeManager.getType(new TypeSignature(StandardTypes.SPHERICAL_GEOGRAPHY)); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 332b3117ecdc..c7014e113ceb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -37,10 +37,15 @@ import io.trino.spi.NodeVersion; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import io.trino.spi.type.VarbinaryType; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; import org.weakref.jmx.Managed; @@ -58,6 +63,7 @@ import static io.trino.plugin.hive.HiveCompressionCodecs.toCompressionCodec; import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME; import static io.trino.plugin.hive.HiveMetadata.TRINO_VERSION_NAME; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeospatialType; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; @@ -170,6 +176,7 @@ private IcebergFileWriter createParquetWriter( .collect(toImmutableList()); List fileColumnTypes = icebergSchema.columns().stream() .map(column -> toTrinoType(column.type(), typeManager)) + .map(this::toFileTrinoType) .collect(toImmutableList()); try { @@ -194,7 +201,7 @@ private IcebergFileWriter createParquetWriter( rollbackAction, fileColumnTypes, fileColumnNames, - convert(icebergSchema, "table"), + convert(toFileSchema(icebergSchema), "table"), makeTypeMap(fileColumnTypes, fileColumnNames), parquetWriterOptions, IntStream.range(0, fileColumnNames.size()).toArray(), @@ -228,6 +235,7 @@ private IcebergFileWriter createOrcWriter( List fileColumnTypes = columnFields.stream() .map(Types.NestedField::type) .map(type -> toTrinoType(type, typeManager)) + .map(this::toFileTrinoType) .collect(toImmutableList()); Optional> validationInputFactory = Optional.empty(); @@ -253,7 +261,7 @@ private IcebergFileWriter createOrcWriter( rollbackAction, fileColumnNames, fileColumnTypes, - toOrcType(icebergSchema), + toOrcType(toFileSchema(icebergSchema)), compressionCodec.getOrcCompressionKind(), withBloomFilterOptions(orcWriterOptions, storageProperties) .withStripeMinSize(getOrcWriterMinStripeSize(session)) @@ -305,6 +313,7 @@ private IcebergFileWriter createAvroWriter( List columnTypes = icebergSchema.columns().stream() .map(column -> toTrinoType(column.type(), typeManager)) + .map(this::toFileTrinoType) .collect(toImmutableList()); HiveCompressionCodec compressionCodec = getHiveCompressionCodec(AVRO, storageProperties) @@ -313,8 +322,86 @@ private IcebergFileWriter createAvroWriter( return new IcebergAvroFileWriter( new ForwardingOutputFile(fileSystem, outputPath), rollbackAction, - icebergSchema, + toFileSchema(icebergSchema), columnTypes, compressionCodec); } + + /** + * Convert an Iceberg schema for file writing by replacing GEOMETRY/GEOGRAPHY types with BINARY. + * File formats don't understand these types, but Iceberg table metadata preserves them. + */ + private static Schema toFileSchema(Schema icebergSchema) + { + List columns = icebergSchema.columns().stream() + .map(IcebergFileWriterFactory::toFileType) + .collect(toImmutableList()); + return new Schema(columns); + } + + private static Types.NestedField toFileType(Types.NestedField field) + { + org.apache.iceberg.types.Type type = toFileType(field.type()); + if (type == field.type()) { + return field; + } + return Types.NestedField.of(field.fieldId(), field.isOptional(), field.name(), type, field.doc()); + } + + private static org.apache.iceberg.types.Type toFileType(org.apache.iceberg.types.Type type) + { + if (type.typeId() == TypeID.GEOMETRY || type.typeId() == TypeID.GEOGRAPHY) { + // Replace geometry/geography with binary for file writing + return Types.BinaryType.get(); + } + if (type instanceof Types.StructType structType) { + return Types.StructType.of(structType.fields().stream() + .map(IcebergFileWriterFactory::toFileType) + .collect(toImmutableList())); + } + if (type instanceof Types.ListType listType) { + org.apache.iceberg.types.Type elementType = toFileType(listType.elementType()); + if (elementType == listType.elementType()) { + return type; + } + if (listType.isElementOptional()) { + return Types.ListType.ofOptional(listType.elementId(), elementType); + } + return Types.ListType.ofRequired(listType.elementId(), elementType); + } + if (type instanceof Types.MapType mapType) { + org.apache.iceberg.types.Type keyType = toFileType(mapType.keyType()); + org.apache.iceberg.types.Type valueType = toFileType(mapType.valueType()); + if (keyType == mapType.keyType() && valueType == mapType.valueType()) { + return type; + } + if (mapType.isValueOptional()) { + return Types.MapType.ofOptional(mapType.keyId(), mapType.valueId(), keyType, valueType); + } + return Types.MapType.ofRequired(mapType.keyId(), mapType.valueId(), keyType, valueType); + } + return type; + } + + /** + * Convert Trino type for file writing - geometry/geography become varbinary. + */ + private Type toFileTrinoType(Type type) + { + if (isGeospatialType(type)) { + return VarbinaryType.VARBINARY; + } + if (type instanceof ArrayType arrayType) { + return new ArrayType(toFileTrinoType(arrayType.getElementType())); + } + if (type instanceof MapType mapType) { + return new MapType(toFileTrinoType(mapType.getKeyType()), toFileTrinoType(mapType.getValueType()), typeManager.getTypeOperators()); + } + if (type instanceof RowType rowType) { + return RowType.from(rowType.getFields().stream() + .map(field -> new RowType.Field(field.getName(), toFileTrinoType(field.getType()))) + .collect(toImmutableList())); + } + return type; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index fe94a28f4dcd..9e5e66ddfd22 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -298,6 +298,7 @@ import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; import static io.trino.plugin.iceberg.ExpressionConverter.isConvertibleToIcebergExpression; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeospatialType; import static io.trino.plugin.iceberg.IcebergAnalyzeProperties.getColumnNames; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; @@ -3428,7 +3429,9 @@ private TableStatisticsMetadata getStatisticsCollectionMetadata( .filter(column -> !column.isHidden()) .filter(column -> { io.trino.spi.type.Type type = column.getType(); - return !(type instanceof MapType || type instanceof ArrayType || type instanceof RowType); // is scalar type + // Geometry and Geography are excluded because Iceberg doesn't support geospatial statistics + return !(type instanceof MapType || type instanceof ArrayType || type instanceof RowType + || isGeospatialType(type)); }) .filter(column -> column.getType() != VARIANT) // variant does not support NDV statistics .map(ColumnMetadata::getName) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 8b51e6f3b3a1..a72ea5065189 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; @@ -21,18 +22,28 @@ import io.airlift.units.DataSize; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.geospatial.serde.JtsGeometrySerde; import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform; import io.trino.spi.Page; import io.trino.spi.PageIndexer; import io.trino.spi.PageIndexerFactory; import io.trino.spi.PageSorter; import io.trino.spi.TrinoException; +import io.trino.spi.block.ArrayBlockBuilder; import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.MapBlockBuilder; +import io.trino.spi.block.RowBlockBuilder; +import io.trino.spi.block.SqlMap; +import io.trino.spi.block.SqlRow; import io.trino.spi.connector.ConnectorPageSink; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SortOrder; +import io.trino.spi.type.ArrayType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.LongTimestamp; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import io.trino.spi.type.VarbinaryType; @@ -62,6 +73,9 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.Slices.wrappedBuffer; +import static io.trino.geospatial.serde.JtsGeometrySerde.OGC_CRS84_SRID; +import static io.trino.geospatial.serde.JtsGeometrySerde.ewkbToWkb; +import static io.trino.geospatial.serde.JtsGeometrySerde.extractSrid; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; import static io.trino.plugin.iceberg.IcebergSessionProperties.isSortedWritingEnabled; @@ -72,6 +86,7 @@ import static io.trino.plugin.iceberg.util.Timestamps.timestampToNanos; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToNanos; +import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; @@ -90,6 +105,7 @@ import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.UuidType.UUID; import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid; +import static io.trino.spi.type.VarbinaryType.VARBINARY; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -126,6 +142,8 @@ public class IcebergPageSink private final List columnTypes; private final List sortColumnIndexes; private final List sortOrders; + // Maps column index to top-level types that contain geometry + private final Map columnsWithGeometry; private final List writers = new ArrayList<>(); private final List closedWriterRollbackActions = new ArrayList<>(); @@ -183,6 +201,17 @@ public IcebergPageSink( .map(IcebergColumnHandle::getType) .collect(toImmutableList()); + // Build mapping of top-level columns that contain geometry + ImmutableMap.Builder columnsWithGeometry = ImmutableMap.builder(); + List columns = outputSchema.columns(); + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (containsGeometry(field.type())) { + columnsWithGeometry.put(i, field.type()); + } + } + this.columnsWithGeometry = columnsWithGeometry.buildOrThrow(); + this.tempDirectory = sortedWritingLocalStagingPath .map(path -> path.replace("${USER}", session.getIdentity().getUser())) .map(IcebergPageSink::createLocalSchemeIfAbsent) @@ -318,6 +347,9 @@ private void writePage(Page page) long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); + // Transform geometry columns: validate SRID and convert EWKB to WKB + pageForWriter = transformGeometryColumnsForWrite(pageForWriter); + writer.appendRows(pageForWriter); writtenBytes += (writer.getWrittenBytes() - currentWritten); @@ -327,6 +359,121 @@ private void writePage(Page page) } } + /** + * Transform geometry columns for write: validate SRID and convert EWKB to WKB. + * Returns the page with geometry columns transformed, or the original page if no geometry columns. + */ + private Page transformGeometryColumnsForWrite(Page page) + { + if (columnsWithGeometry.isEmpty()) { + return page; + } + + Block[] blocks = new Block[page.getChannelCount()]; + for (int channel = 0; channel < page.getChannelCount(); channel++) { + Block block = page.getBlock(channel); + org.apache.iceberg.types.Type icebergType = columnsWithGeometry.get(channel); + if (icebergType != null) { + block = transformGeometryBlockForWrite(columnTypes.get(channel), icebergType, block, channel); + } + blocks[channel] = block; + } + return new Page(page.getPositionCount(), blocks); + } + + private static boolean containsGeometry(org.apache.iceberg.types.Type type) + { + return switch (type.typeId()) { + case GEOMETRY -> true; + case LIST -> containsGeometry(type.asListType().elementType()); + case MAP -> containsGeometry(type.asMapType().keyType()) || containsGeometry(type.asMapType().valueType()); + case STRUCT -> type.asStructType().fields().stream().anyMatch(field -> containsGeometry(field.type())); + default -> false; + }; + } + + private static Block transformGeometryBlockForWrite(Type trinoType, org.apache.iceberg.types.Type icebergType, Block block, int columnIndex) + { + BlockBuilder builder = trinoType.createBlockBuilder(null, block.getPositionCount()); + for (int position = 0; position < block.getPositionCount(); position++) { + appendTransformedGeometryValueForWrite(trinoType, icebergType, block, position, builder, columnIndex); + } + return builder.build(); + } + + private static void appendTransformedGeometryValueForWrite(Type trinoType, org.apache.iceberg.types.Type icebergType, Block block, int position, BlockBuilder builder, int columnIndex) + { + if (block.isNull(position)) { + builder.appendNull(); + return; + } + + if (icebergType instanceof Types.GeometryType geometryType) { + Slice ewkb = trinoType.getSlice(block, position); + int sourceSrid = extractSrid(ewkb); + int targetSrid = getGeometrySrid(geometryType); + + // Validate SRID: fail only if both source and target are non-zero and different + if (sourceSrid != 0 && targetSrid != 0 && sourceSrid != targetSrid) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, + "SRID mismatch: cannot write geometry with SRID %d into column %d with SRID %d".formatted(sourceSrid, columnIndex, targetSrid)); + } + + // Strip SRID from EWKB to produce standard WKB for storage + Slice wkb = ewkbToWkb(ewkb); + VARBINARY.writeSlice(builder, wkb); + return; + } + + if (trinoType instanceof ArrayType arrayType && icebergType instanceof Types.ListType listType) { + Block arrayBlock = arrayType.getObject(block, position); + ((ArrayBlockBuilder) builder).buildEntry(elementBuilder -> { + for (int i = 0; i < arrayBlock.getPositionCount(); i++) { + appendTransformedGeometryValueForWrite(arrayType.getElementType(), listType.elementType(), arrayBlock, i, elementBuilder, columnIndex); + } + }); + return; + } + + if (trinoType instanceof MapType mapType && icebergType instanceof Types.MapType mapIcebergType) { + SqlMap sqlMap = mapType.getObject(block, position); + int rawOffset = sqlMap.getRawOffset(); + ((MapBlockBuilder) builder).buildEntry((keyBuilder, valueBuilder) -> { + for (int i = 0; i < sqlMap.getSize(); i++) { + int rawPosition = rawOffset + i; + appendTransformedGeometryValueForWrite(mapType.getKeyType(), mapIcebergType.keyType(), sqlMap.getRawKeyBlock(), rawPosition, keyBuilder, columnIndex); + appendTransformedGeometryValueForWrite(mapType.getValueType(), mapIcebergType.valueType(), sqlMap.getRawValueBlock(), rawPosition, valueBuilder, columnIndex); + } + }); + return; + } + + if (trinoType instanceof RowType rowType && icebergType instanceof Types.StructType structType) { + SqlRow sqlRow = rowType.getObject(block, position); + int rawIndex = sqlRow.getRawIndex(); + ((RowBlockBuilder) builder).buildEntry(fieldBuilders -> { + for (int fieldIndex = 0; fieldIndex < rowType.getFields().size(); fieldIndex++) { + appendTransformedGeometryValueForWrite( + rowType.getFields().get(fieldIndex).getType(), + structType.fields().get(fieldIndex).type(), + sqlRow.getRawFieldBlock(fieldIndex), + rawIndex, + fieldBuilders.get(fieldIndex), + columnIndex); + } + }); + return; + } + + builder.append(block.getUnderlyingValueBlock(), block.getUnderlyingValuePosition(position)); + } + + private static int getGeometrySrid(Types.GeometryType geometryType) + { + String crs = geometryType.crs(); + return (crs == null) ? OGC_CRS84_SRID : JtsGeometrySerde.crsToSrid(crs); + } + private int[] getWriterIndexes(Page page) { int[] writerIndexes = pagePartitioner.partitionPage(page); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index a3aeb78de8c0..33673df6d6b2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -25,6 +25,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.geospatial.serde.JtsGeometrySerde; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; @@ -62,11 +63,17 @@ import io.trino.plugin.iceberg.system.files.FilesTableSplit; import io.trino.spi.Page; import io.trino.spi.TrinoException; +import io.trino.spi.block.ArrayBlockBuilder; import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.IntArrayBlock; import io.trino.spi.block.LongArrayBlock; +import io.trino.spi.block.MapBlockBuilder; import io.trino.spi.block.RowBlock; +import io.trino.spi.block.RowBlockBuilder; import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.SqlMap; +import io.trino.spi.block.SqlRow; import io.trino.spi.block.VariableWidthBlock; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; @@ -145,6 +152,8 @@ import static io.airlift.slice.SizeOf.instanceSize; import static io.airlift.slice.SizeOf.sizeOf; import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.geospatial.serde.JtsGeometrySerde.OGC_CRS84_SRID; +import static io.trino.geospatial.serde.JtsGeometrySerde.wkbToEwkb; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE; import static io.trino.orc.OrcReader.ProjectedLayout; @@ -155,6 +164,8 @@ import static io.trino.parquet.predicate.PredicateUtils.getFilteredRowGroups; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createDataSource; import static io.trino.plugin.iceberg.ColumnIdentity.TypeCategory.PRIMITIVE; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeometryType; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeospatialType; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; @@ -193,6 +204,7 @@ import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; +import static io.trino.spi.type.VarbinaryType.VARBINARY; import static java.lang.Math.addExact; import static java.lang.Math.min; import static java.lang.Math.toIntExact; @@ -630,6 +642,7 @@ private ReaderPageSourceWithRowPositions createDataPageSource( partition, dataColumns, partitionKeys, + typeManager, dataSequenceNumber, fileFirstRowId); }; @@ -807,13 +820,15 @@ else if (column.isLastUpdatedSequenceNumberColumn()) { transforms.transform(new DataSequenceNumberTransform(dataSequenceNumber, ordinal)); } else if (column.isBaseColumn()) { - transforms.column(ordinal); + transforms.column(ordinal, getSridInjectionTransform(column, tableSchema)); } else { - transforms.dereferenceField(ImmutableList.builder() - .add(ordinal) - .addAll(applyProjection(column, baseColumn)) - .build()); + transforms.dereferenceField( + ImmutableList.builder() + .add(ordinal) + .addAll(applyProjection(column, baseColumn)) + .build(), + getSridInjectionTransform(column, tableSchema)); } } } @@ -914,7 +929,34 @@ private static Type getOrcReadType(Type columnType, TypeManager typeManager) .map(field -> new RowType.Field(field.getName(), getOrcReadType(field.getType(), typeManager))) .collect(toImmutableList())); } + // Geometry/Geography are stored as binary, read as varbinary then transform + if (isGeospatialType(columnType)) { + return VARBINARY; + } + + return columnType; + } + /** + * Get the type to use for reading from file formats. + * Geometry/Geography are stored as binary, so we read as varbinary and transform. + */ + private static Type getFileReadType(Type columnType, TypeManager typeManager) + { + if (columnType instanceof ArrayType arrayType) { + return new ArrayType(getFileReadType(arrayType.getElementType(), typeManager)); + } + if (columnType instanceof MapType mapType) { + return new MapType(getFileReadType(mapType.getKeyType(), typeManager), getFileReadType(mapType.getValueType(), typeManager), typeManager.getTypeOperators()); + } + if (columnType instanceof RowType rowType) { + return RowType.from(rowType.getFields().stream() + .map(field -> new RowType.Field(field.getName(), getFileReadType(field.getType(), typeManager))) + .collect(toImmutableList())); + } + if (isGeospatialType(columnType)) { + return VARBINARY; + } return columnType; } @@ -1138,13 +1180,15 @@ else if (column.isLastUpdatedSequenceNumberColumn()) { transforms.transform(new DataSequenceNumberTransform(dataSequenceNumber, ordinal)); } else if (column.isBaseColumn()) { - transforms.column(ordinal); + transforms.column(ordinal, getSridInjectionTransform(column, tableSchema)); } else { - transforms.dereferenceField(ImmutableList.builder() - .add(ordinal) - .addAll(applyProjection(column, baseColumn)) - .build()); + transforms.dereferenceField( + ImmutableList.builder() + .add(ordinal) + .addAll(applyProjection(column, baseColumn)) + .build(), + getSridInjectionTransform(column, tableSchema)); } } } @@ -1261,6 +1305,7 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource( String partition, List columns, Map> partitionKeys, + TypeManager typeManager, long dataSequenceNumber, OptionalLong fileFirstRowId) { @@ -1352,7 +1397,7 @@ else if (column.isLastUpdatedSequenceNumberColumn()) { baseColumnIdToOrdinal.put(baseColumn.getId(), ordinal); columnNames.add(getAvroColumnName(baseColumn)); - columnTypes.add(baseColumn.getType()); + columnTypes.add(getFileReadType(baseColumn.getType(), typeManager)); } if (column.isRowIdColumn() && fileFirstRowId.isPresent()) { @@ -1363,13 +1408,15 @@ else if (column.isLastUpdatedSequenceNumberColumn()) { transforms.transform(new DataSequenceNumberTransform(dataSequenceNumber, ordinal)); } else if (column.isBaseColumn()) { - transforms.column(ordinal); + transforms.column(ordinal, getSridInjectionTransform(column, fileSchema)); } else { - transforms.dereferenceField(ImmutableList.builder() - .add(ordinal) - .addAll(applyProjection(column, baseColumn)) - .build()); + transforms.dereferenceField( + ImmutableList.builder() + .add(ordinal) + .addAll(applyProjection(column, baseColumn)) + .build(), + getSridInjectionTransform(column, fileSchema)); } } } @@ -1659,6 +1706,126 @@ private static TrinoException handleException(ParquetDataSourceId dataSourceId, return new TrinoException(ICEBERG_CURSOR_ERROR, format("Failed to read Parquet file: %s", dataSourceId), exception); } + /** + * Get a transform to inject SRID into geometry columns. + * For geometry columns, reads the CRS from the Iceberg schema and converts WKB to EWKB with the SRID. + * For non-geometry columns, returns Optional.empty() (no transform needed). + */ + private static Optional> getSridInjectionTransform(IcebergColumnHandle column, Schema tableSchema) + { + if (!containsGeometry(column.getType())) { + return Optional.empty(); + } + + // Find the field in the schema to get the CRS + Types.NestedField field = tableSchema.findField(column.getId()); + if (field == null || !containsGeometry(field.type())) { + return Optional.empty(); + } + + return Optional.of(block -> injectSridIntoBlock(column.getType(), field.type(), block)); + } + + /** + * Transform a Block of WKB bytes to a Block of EWKB bytes by injecting the SRID. + */ + private static Block injectSridIntoBlock(Type trinoType, org.apache.iceberg.types.Type icebergType, Block block) + { + BlockBuilder builder = trinoType.createBlockBuilder(null, block.getPositionCount()); + for (int position = 0; position < block.getPositionCount(); position++) { + appendTransformedGeometryValueForRead(trinoType, icebergType, block, position, builder); + } + return builder.build(); + } + + private static boolean containsGeometry(Type type) + { + if (isGeometryType(type)) { + return true; + } + if (type instanceof ArrayType arrayType) { + return containsGeometry(arrayType.getElementType()); + } + if (type instanceof MapType mapType) { + return containsGeometry(mapType.getKeyType()) || containsGeometry(mapType.getValueType()); + } + if (type instanceof RowType rowType) { + return rowType.getFields().stream().anyMatch(field -> containsGeometry(field.getType())); + } + return false; + } + + private static boolean containsGeometry(org.apache.iceberg.types.Type type) + { + return switch (type.typeId()) { + case GEOMETRY -> true; + case LIST -> containsGeometry(type.asListType().elementType()); + case MAP -> containsGeometry(type.asMapType().keyType()) || containsGeometry(type.asMapType().valueType()); + case STRUCT -> type.asStructType().fields().stream().anyMatch(field -> containsGeometry(field.type())); + default -> false; + }; + } + + private static void appendTransformedGeometryValueForRead(Type trinoType, org.apache.iceberg.types.Type icebergType, Block block, int position, BlockBuilder builder) + { + if (block.isNull(position)) { + builder.appendNull(); + return; + } + + if (isGeometryType(trinoType) && icebergType instanceof Types.GeometryType geometryType) { + trinoType.writeSlice(builder, wkbToEwkb(VARBINARY.getSlice(block, position), getGeometrySrid(geometryType))); + return; + } + + if (trinoType instanceof ArrayType arrayType && icebergType instanceof Types.ListType listType) { + Block arrayBlock = arrayType.getObject(block, position); + ((ArrayBlockBuilder) builder).buildEntry(elementBuilder -> { + for (int i = 0; i < arrayBlock.getPositionCount(); i++) { + appendTransformedGeometryValueForRead(arrayType.getElementType(), listType.elementType(), arrayBlock, i, elementBuilder); + } + }); + return; + } + + if (trinoType instanceof MapType mapType && icebergType instanceof Types.MapType mapIcebergType) { + SqlMap sqlMap = mapType.getObject(block, position); + int rawOffset = sqlMap.getRawOffset(); + ((MapBlockBuilder) builder).buildEntry((keyBuilder, valueBuilder) -> { + for (int i = 0; i < sqlMap.getSize(); i++) { + int rawPosition = rawOffset + i; + appendTransformedGeometryValueForRead(mapType.getKeyType(), mapIcebergType.keyType(), sqlMap.getRawKeyBlock(), rawPosition, keyBuilder); + appendTransformedGeometryValueForRead(mapType.getValueType(), mapIcebergType.valueType(), sqlMap.getRawValueBlock(), rawPosition, valueBuilder); + } + }); + return; + } + + if (trinoType instanceof RowType rowType && icebergType instanceof Types.StructType structType) { + SqlRow sqlRow = rowType.getObject(block, position); + int rawIndex = sqlRow.getRawIndex(); + ((RowBlockBuilder) builder).buildEntry(fieldBuilders -> { + for (int fieldIndex = 0; fieldIndex < rowType.getFields().size(); fieldIndex++) { + appendTransformedGeometryValueForRead( + rowType.getFields().get(fieldIndex).getType(), + structType.fields().get(fieldIndex).type(), + sqlRow.getRawFieldBlock(fieldIndex), + rawIndex, + fieldBuilders.get(fieldIndex)); + } + }); + return; + } + + builder.append(block.getUnderlyingValueBlock(), block.getUnderlyingValuePosition(position)); + } + + private static int getGeometrySrid(Types.GeometryType geometryType) + { + String crs = geometryType.crs(); + return (crs == null) ? OGC_CRS84_SRID : JtsGeometrySerde.crsToSrid(crs); + } + public record ReaderPageSourceWithRowPositions( ConnectorPageSource pageSource, OptionalLong startRowPosition, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java index 51c3f15bd6b6..741a2c84ff39 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java @@ -34,6 +34,7 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.base.io.ByteBuffers.getWrappedBytes; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeospatialType; import static io.trino.plugin.iceberg.util.Timestamps.timestampFromNanos; import static io.trino.plugin.iceberg.util.Timestamps.timestampToNanos; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; @@ -143,6 +144,12 @@ public static Object convertTrinoValueToIceberg(io.trino.spi.type.Type type, Obj return trinoUuidToJavaUuid(((Slice) trinoNativeValue)); } + // Geometry and Geography should never reach here - they're excluded from + // predicate pushdown and statistics collection + if (isGeospatialType(type)) { + throw new UnsupportedOperationException("Geometry/Geography values cannot be converted for Iceberg expressions or statistics"); + } + throw new UnsupportedOperationException("Unsupported type: " + type); } @@ -213,6 +220,12 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) return javaUuidToTrinoUuid((UUID) value); } + // Geometry and Geography should never reach here - they're excluded from + // partitioning and statistics + if (icebergType instanceof Types.GeometryType || icebergType instanceof Types.GeographyType) { + throw new UnsupportedOperationException("Geometry/Geography values cannot be converted from Iceberg partition or statistics values"); + } + throw new UnsupportedOperationException("Unsupported iceberg type: " + icebergType); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java index 1f14e88cd4ee..5b1cbba87828 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TypeConverter.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import io.trino.geospatial.serde.JtsGeometrySerde; import io.trino.spi.TrinoException; import io.trino.spi.type.ArrayType; import io.trino.spi.type.BigintType; @@ -36,7 +37,9 @@ import io.trino.spi.type.UuidType; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; +import org.apache.iceberg.types.EdgeAlgorithm; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.GeographyType; import java.util.ArrayList; import java.util.HashSet; @@ -47,6 +50,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.iceberg.GeoSpatialUtils.getGeometryType; +import static io.trino.plugin.iceberg.GeoSpatialUtils.getSphericalGeographyType; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeometryType; +import static io.trino.plugin.iceberg.GeoSpatialUtils.isSphericalGeographyType; import static io.trino.spi.StandardErrorCode.DUPLICATE_COLUMN_NAME; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.TimeType.TIME_MICROS; @@ -58,6 +65,7 @@ import static io.trino.spi.type.VariantType.VARIANT; import static java.lang.String.format; import static java.util.Locale.ENGLISH; +import static org.apache.iceberg.types.EdgeAlgorithm.SPHERICAL; public final class TypeConverter { @@ -110,7 +118,30 @@ public static Type toTrinoType(org.apache.iceberg.types.Type type, TypeManager t case VARIANT: return VARIANT; case GEOMETRY: + Types.GeometryType geometryType = (Types.GeometryType) type; + String geometryCrs = geometryType.crs(); + if (geometryCrs != null) { + try { + JtsGeometrySerde.crsToSrid(geometryCrs); + } + catch (IllegalArgumentException e) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported geometry CRS '%s'. Supported values are OGC:CRS84/CRS84 and positive EPSG codes.".formatted(geometryCrs), e); + } + } + return getGeometryType(typeManager); case GEOGRAPHY: + GeographyType geographyType = (GeographyType) type; + // Validate WGS84 (OGC:CRS84) with spherical algorithm + String crs = geographyType.crs(); + // CRS null is treated as WGS84 + if (crs != null && !crs.equalsIgnoreCase("OGC:CRS84") && !crs.equalsIgnoreCase("EPSG:4326")) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported geography CRS '%s'. Only WGS84 (OGC:CRS84 or EPSG:4326) is supported.".formatted(crs)); + } + EdgeAlgorithm algorithm = geographyType.algorithm(); + if (algorithm != null && algorithm != SPHERICAL) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported geography algorithm '%s'. Only 'SPHERICAL' is supported.".formatted(algorithm)); + } + return getSphericalGeographyType(typeManager); case UNKNOWN: break; } @@ -186,6 +217,14 @@ private static org.apache.iceberg.types.Type toIcebergTypeInternal(Type type, Op if (type instanceof MapType mapType) { return fromMap(mapType, columnIdentity, nextFieldId); } + if (isGeometryType(type)) { + // Default to OGC:CRS84 (WGS84) + return Types.GeometryType.of("OGC:CRS84"); + } + if (isSphericalGeographyType(type)) { + // Always WGS84 with spherical algorithm + return GeographyType.of("OGC:CRS84", SPHERICAL); + } if (type instanceof TimeType timeType) { throw new TrinoException(NOT_SUPPORTED, format("Time precision (%s) not supported for Iceberg. Use \"time(6)\" instead.", timeType.getPrecision())); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/HiveSchemaUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/HiveSchemaUtil.java index 0dca5351ff78..15c3a69b6af7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/HiveSchemaUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/HiveSchemaUtil.java @@ -43,10 +43,10 @@ private static String convertToTypeString(Type type) case DATE -> "date"; case TIME, STRING, UUID -> "string"; case TIMESTAMP, TIMESTAMP_NANO -> "timestamp"; - case FIXED, BINARY -> "binary"; + case FIXED, BINARY, GEOMETRY, GEOGRAPHY -> "binary"; case DECIMAL -> "decimal(%s,%s)".formatted(((DecimalType) type).precision(), ((DecimalType) type).scale()); - case UNKNOWN, GEOMETRY, GEOGRAPHY -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: " + type); case VARIANT -> "struct"; + case UNKNOWN -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: " + type); case LIST -> "array<%s>".formatted(convert(type.asListType().elementType())); case MAP -> "map<%s,%s>".formatted(convert(type.asMapType().keyType()), convert(type.asMapType().valueType())); case STRUCT -> "struct<%s>".formatted(type.asStructType().fields().stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java index 4813ebaa5c3c..8bf1f9f8c58a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/OrcTypeConverter.java @@ -88,7 +88,7 @@ private static List toOrcType(int nextFieldTypeIndex, Type type, Map ImmutableList.of(new OrcType(OrcTypeKind.STRING, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); - case FIXED, BINARY -> ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); + case FIXED, BINARY, GEOMETRY, GEOGRAPHY -> ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes)); case DECIMAL -> { DecimalType decimalType = (DecimalType) type; yield ImmutableList.of(new OrcType(OrcTypeKind.DECIMAL, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(decimalType.precision()), Optional.of(decimalType.scale()), attributes)); @@ -101,7 +101,7 @@ private static List toOrcType(int nextFieldTypeIndex, Type type, Map toOrcVariantType(nextFieldTypeIndex, attributes); - case GEOMETRY, GEOGRAPHY, UNKNOWN -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: " + type); + case UNKNOWN -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: " + type); case STRUCT -> toOrcStructType(nextFieldTypeIndex, (StructType) type, attributes); case LIST -> toOrcListType(nextFieldTypeIndex, (ListType) type, attributes); case MAP -> toOrcMapType(nextFieldTypeIndex, (MapType) type, attributes); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java index 668ef2c7180f..76ce25994b80 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java @@ -19,12 +19,15 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.trino.Session; +import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.metastore.HiveMetastore; +import io.trino.plugin.geospatial.GeoPlugin; import io.trino.plugin.hive.HivePlugin; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.tpch.TpchPlugin; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.ConnectorIdentity; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; @@ -36,6 +39,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; @@ -51,7 +55,9 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.EdgeAlgorithm; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.GeometryType; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -60,7 +66,9 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.UUID; @@ -107,6 +115,7 @@ protected QueryRunner createQueryRunner() queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); + queryRunner.installPlugin(new GeoPlugin()); dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"); dataDirectory.toFile().mkdirs(); @@ -1628,6 +1637,254 @@ void testOrcTimestampNanoFiltering() } } + private static Path latestMetadataJson(Path tableLocation) + throws IOException + { + Path metadataDir = tableLocation.resolve("metadata"); + try (var stream = Files.list(metadataDir)) { + return stream + .filter(path -> path.getFileName().toString().endsWith(".metadata.json")) + .max(Comparator.naturalOrder()) + .orElseThrow(() -> new IllegalStateException("No metadata.json found in " + metadataDir)); + } + } + + private long getFileSize(String dataFilePath) + throws IOException + { + return getFileSystemFactory(getQueryRunner()) + .create(ConnectorIdentity.ofUser("test")) + .newInputFile(Location.of(dataFilePath)) + .length(); + } + + @Test + void testGeometryTypeJsonSerialization() + { + GeometryType defaultGeomType = Types.GeometryType.of("OGC:CRS84"); + assertThat(defaultGeomType.crs()).isNull(); + + GeometryType epsg3857 = Types.GeometryType.of("EPSG:3857"); + assertThat(epsg3857.crs()).isEqualTo("EPSG:3857"); + + Schema schema = new Schema( + Types.NestedField.optional(1, "geom_default", Types.GeometryType.of("OGC:CRS84")), + Types.NestedField.optional(2, "geom_3857", Types.GeometryType.of("EPSG:3857"))); + String json = SchemaParser.toJson(schema); + + Schema parsed = SchemaParser.fromJson(json); + + Types.GeometryType parsedDefault = (Types.GeometryType) parsed.findField("geom_default").type(); + assertThat(parsedDefault.crs()).isNull(); + + Types.GeometryType parsed3857 = (Types.GeometryType) parsed.findField("geom_3857").type(); + assertThat(parsed3857.crs()).isEqualTo("EPSG:3857"); + } + + @Test + void testGeometryWithCustomSrid() + { + String hadoopTableName = "hadoop_geometry_srid_" + randomNameSuffix(); + Path hadoopTableLocation = dataDirectory.resolve(hadoopTableName); + + Schema schema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "geom", Types.GeometryType.of("EPSG:3857"))); + + new HadoopTables(new Configuration(false)).create( + schema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + ImmutableMap.of( + "format-version", "3", + "write.format.default", "PARQUET"), + hadoopTableLocation.toString()); + + String registered = "registered_geom_srid_" + randomNameSuffix(); + assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')" + .formatted(registered, hadoopTableLocation)); + + assertThat(query("SELECT * FROM " + registered)) + .returnsEmptyResult(); + + assertThat(query("DESCRIBE " + registered)) + .matches("VALUES (VARCHAR 'id', VARCHAR 'integer', VARCHAR '', VARCHAR ''), " + + "(VARCHAR 'geom', VARCHAR 'Geometry', VARCHAR '', VARCHAR '')"); + + assertUpdate("INSERT INTO " + registered + " VALUES (1, ST_SetSRID(ST_Point(1, 2), 3857))", 1); + + assertThat(query("SELECT ST_AsText(geom), ST_SRID(geom) FROM " + registered)) + .matches("VALUES (VARCHAR 'POINT (1 2)', 3857)"); + + assertUpdate("DROP TABLE " + registered); + } + + @Test + void testGeometryRoundTrip() + { + for (String format : List.of("PARQUET", "ORC", "AVRO")) { + try (TestTable table = newTrinoTable("test_geometry_roundtrip_" + format.toLowerCase(Locale.ROOT) + "_", + "(id INTEGER, geom geometry) WITH (format = '" + format + "', format_version = 3)")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, ST_Point(1.0, 2.0))", 1); + assertUpdate("INSERT INTO " + table.getName() + " VALUES (2, ST_GeometryFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))'))", 1); + + assertThat(query("SELECT id, ST_AsText(geom) FROM " + table.getName() + " ORDER BY id")) + .matches("VALUES (1, VARCHAR 'POINT (1 2)'), (2, VARCHAR 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))')"); + + assertThat(query("SELECT ST_SRID(geom) FROM " + table.getName() + " WHERE id = 1")) + .matches("VALUES 4326"); + } + } + } + + @Test + void testNestedGeometryRoundTrip() + { + for (String format : ALL_FILE_FORMATS) { + for (NestedGeometryContainer container : nestedGeometryContainers()) { + try (TestTable table = newTrinoTable("test_nested_geometry_roundtrip_" + container.name() + "_" + format.toLowerCase(Locale.ROOT) + "_", + "(id INTEGER, payload " + container.columnType() + ") WITH (format = '" + format + "', format_version = 3)")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES " + + "(1, " + container.firstValue() + "), " + + "(2, " + container.secondValue() + ")", 2); + + assertThat(query("SELECT id, ST_AsText(" + container.geometryExpression() + "), ST_SRID(" + container.geometryExpression() + ") FROM " + table.getName() + " ORDER BY id")) + .matches("VALUES " + + "(1, VARCHAR 'POINT (1 2)', 4326), " + + "(2, VARCHAR 'POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))', 4326)"); + } + } + } + } + + private static List nestedGeometryContainers() + { + return List.of( + new NestedGeometryContainer( + "row", + "ROW(geom geometry)", + "CAST(ROW(ST_Point(1.0, 2.0)) AS ROW(geom geometry))", + "CAST(ROW(ST_GeometryFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))')) AS ROW(geom geometry))", + "payload.geom"), + new NestedGeometryContainer( + "array", + "ARRAY(geometry)", + "ARRAY[ST_Point(1.0, 2.0)]", + "ARRAY[ST_GeometryFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))')]", + "payload[1]"), + new NestedGeometryContainer( + "map", + "MAP(VARCHAR, geometry)", + "map(ARRAY['geom'], ARRAY[ST_Point(1.0, 2.0)])", + "map(ARRAY['geom'], ARRAY[ST_GeometryFromText('POLYGON((0 0, 1 0, 1 1, 0 1, 0 0))')])", + "payload['geom']")); + } + + private record NestedGeometryContainer( + String name, + String columnType, + String firstValue, + String secondValue, + String geometryExpression) {} + + @Test + void testGeographyRoundTrip() + { + for (String format : List.of("PARQUET", "ORC", "AVRO")) { + try (TestTable table = newTrinoTable("test_geography_roundtrip_" + format.toLowerCase(Locale.ROOT) + "_", + "(id INTEGER, geog sphericalgeography) WITH (format = '" + format + "', format_version = 3)")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, to_spherical_geography(ST_Point(-122.4194, 37.7749)))", 1); + + assertThat(query("SELECT id, ST_AsText(to_geometry(geog)) FROM " + table.getName())) + .matches("VALUES (1, VARCHAR 'POINT (-122.4194 37.7749)')"); + } + } + } + + @Test + void testWriteSridMismatchFails() + { + try (TestTable table = newTrinoTable("test_srid_mismatch_", + "(geom geometry) WITH (format = 'PARQUET', format_version = 3)")) { + assertThat(query("INSERT INTO " + table.getName() + " SELECT ST_SetSRID(ST_Point(1, 1), 3857)")) + .failure() + .hasMessageContaining("SRID mismatch"); + } + } + + @Test + void testWriteSridZeroAllowed() + { + try (TestTable table = newTrinoTable("test_srid_zero_", + "(geom geometry) WITH (format = 'PARQUET', format_version = 3)")) { + assertUpdate("INSERT INTO " + table.getName() + " SELECT ST_Point(1, 1)", 1); + assertUpdate("INSERT INTO " + table.getName() + " SELECT ST_SetSRID(ST_Point(2, 2), 4326)", 1); + + assertThat(query("SELECT count(*) FROM " + table.getName())) + .matches("VALUES BIGINT '2'"); + } + } + + @Test + void testUnsupportedGeographyAlgorithm() + { + String hadoopTableName = "hadoop_unsupported_algo_" + randomNameSuffix(); + Path hadoopTableLocation = dataDirectory.resolve(hadoopTableName); + + Schema schema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "geog", Types.GeographyType.of("OGC:CRS84", EdgeAlgorithm.VINCENTY))); + + new HadoopTables(new Configuration(false)).create( + schema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + ImmutableMap.of( + "format-version", "3", + "write.format.default", "PARQUET"), + hadoopTableLocation.toString()); + + String registered = "registered_unsupported_algo_" + randomNameSuffix(); + assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')" + .formatted(registered, hadoopTableLocation)); + + assertQueryFails( + "SELECT * FROM " + registered, + ".*Unsupported geography algorithm.*"); + + assertUpdate("DROP TABLE " + registered); + } + + @Test + void testUnsupportedGeographyCrs() + { + String hadoopTableName = "hadoop_unsupported_crs_" + randomNameSuffix(); + Path hadoopTableLocation = dataDirectory.resolve(hadoopTableName); + + Schema schema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "geog", Types.GeographyType.of("EPSG:3857", EdgeAlgorithm.SPHERICAL))); + + new HadoopTables(new Configuration(false)).create( + schema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + ImmutableMap.of( + "format-version", "3", + "write.format.default", "PARQUET"), + hadoopTableLocation.toString()); + + String registered = "registered_unsupported_crs_" + randomNameSuffix(); + assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')" + .formatted(registered, hadoopTableLocation)); + + assertQueryFails( + "SELECT * FROM " + registered, + ".*Unsupported geography CRS.*"); + + assertUpdate("DROP TABLE " + registered); + } + @Test void testOrcTimestampNanoWithTimeZoneFiltering() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTypeConverter.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTypeConverter.java new file mode 100644 index 000000000000..ee3cb5ea2369 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTypeConverter.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.metadata.TestingFunctionResolution; +import io.trino.plugin.geospatial.GeoPlugin; +import io.trino.spi.TrinoException; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +import static io.trino.plugin.geospatial.GeometryType.GEOMETRY; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestTypeConverter +{ + private static final TypeManager TESTING_TYPE_MANAGER = new TestingFunctionResolution(new GeoPlugin()) + .getPlannerContext() + .getTypeManager(); + + @Test + void testGeometryWithCustomSrid() + { + assertThat(toTrinoType(Types.GeometryType.of("EPSG:3857"), TESTING_TYPE_MANAGER)) + .isEqualTo(GEOMETRY); + } + + @Test + void testGeometryWithInvalidCrs() + { + assertThatThrownBy(() -> toTrinoType(Types.GeometryType.of("EPSG:0"), TESTING_TYPE_MANAGER)) + .isInstanceOfSatisfying(TrinoException.class, exception -> assertThat(exception.getErrorCode()).isEqualTo(NOT_SUPPORTED.toErrorCode())) + .hasMessageContaining("Unsupported geometry CRS 'EPSG:0'"); + } +} From 52e283bb0b6d358a501822365fc6a8b960173095 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Wed, 8 Apr 2026 16:32:15 -0700 Subject: [PATCH 2/2] Add Parquet geospatial logical annotations for Iceberg --- .../parquet/ParquetMetadataConverter.java | 35 ++- .../parquet/TestParquetMetadataConverter.java | 25 ++ .../iceberg/IcebergFileWriterFactory.java | 4 +- .../util/IcebergParquetSchemaConverter.java | 241 ++++++++++++++++++ .../trino/plugin/iceberg/TestIcebergV3.java | 16 +- .../TestIcebergParquetSchemaConverter.java | 63 +++++ 6 files changed, 379 insertions(+), 5 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/IcebergParquetSchemaConverter.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestIcebergParquetSchemaConverter.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetMetadataConverter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetMetadataConverter.java index 89826e4453af..7d7d83510d9e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetMetadataConverter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/ParquetMetadataConverter.java @@ -25,6 +25,8 @@ import org.apache.parquet.format.DecimalType; import org.apache.parquet.format.Encoding; import org.apache.parquet.format.EnumType; +import org.apache.parquet.format.GeographyType; +import org.apache.parquet.format.GeometryType; import org.apache.parquet.format.IntType; import org.apache.parquet.format.JsonType; import org.apache.parquet.format.ListType; @@ -59,6 +61,7 @@ import java.util.List; import java.util.Optional; +import static java.util.Objects.requireNonNullElse; import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics; import static org.apache.parquet.schema.LogicalTypeAnnotation.BsonLogicalTypeAnnotation; import static org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; @@ -160,8 +163,18 @@ public static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) case UUID -> uuidType(); case FLOAT16 -> float16Type(); case VARIANT -> variantType((byte) 1); - case GEOMETRY -> geometryType("OGC:CRS84"); - case GEOGRAPHY -> geographyType(); + case GEOMETRY -> { + GeometryType geometry = type.getGEOMETRY(); + yield geometryType(geometry.isSetCrs() ? geometry.getCrs() : LogicalTypeAnnotation.DEFAULT_CRS); + } + case GEOGRAPHY -> { + GeographyType geography = type.getGEOGRAPHY(); + yield geographyType( + geography.isSetCrs() ? geography.getCrs() : LogicalTypeAnnotation.DEFAULT_CRS, + geography.isSetAlgorithm() ? + org.apache.parquet.column.schema.EdgeInterpolationAlgorithm.valueOf(geography.getAlgorithm().name()) : + LogicalTypeAnnotation.DEFAULT_ALGO); + } }; } @@ -496,6 +509,24 @@ public Optional visit(IntervalLogicalTypeAnnotation type) return Optional.of(LogicalType.UNKNOWN(new NullType())); } + @Override + public Optional visit(LogicalTypeAnnotation.GeometryLogicalTypeAnnotation type) + { + GeometryType geometry = new GeometryType(); + geometry.setCrs(requireNonNullElse(type.getCrs(), LogicalTypeAnnotation.DEFAULT_CRS)); + return Optional.of(LogicalType.GEOMETRY(geometry)); + } + + @Override + public Optional visit(LogicalTypeAnnotation.GeographyLogicalTypeAnnotation type) + { + GeographyType geography = new GeographyType(); + geography.setCrs(requireNonNullElse(type.getCrs(), LogicalTypeAnnotation.DEFAULT_CRS)); + org.apache.parquet.column.schema.EdgeInterpolationAlgorithm algorithm = requireNonNullElse(type.getAlgorithm(), LogicalTypeAnnotation.DEFAULT_ALGO); + geography.setAlgorithm(org.apache.parquet.format.EdgeInterpolationAlgorithm.valueOf(algorithm.name())); + return Optional.of(LogicalType.GEOGRAPHY(geography)); + } + static TimeUnit convertUnit(LogicalTypeAnnotation.TimeUnit unit) { return switch (unit) { diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/TestParquetMetadataConverter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/TestParquetMetadataConverter.java index f35c25abf6cf..266b38f53428 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/TestParquetMetadataConverter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/TestParquetMetadataConverter.java @@ -14,13 +14,19 @@ package io.trino.parquet; import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.format.LogicalType; import org.apache.parquet.format.Statistics; import org.apache.parquet.io.api.Binary; import org.junit.jupiter.api.Test; import java.util.Arrays; +import static io.trino.parquet.ParquetMetadataConverter.convertToLogicalType; +import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation; import static io.trino.parquet.ParquetMetadataConverter.toParquetStatistics; +import static org.apache.parquet.schema.LogicalTypeAnnotation.DEFAULT_ALGO; +import static org.apache.parquet.schema.LogicalTypeAnnotation.geographyType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType; import static org.assertj.core.api.Assertions.assertThat; class TestParquetMetadataConverter @@ -162,4 +168,23 @@ void testOnlyMaxOmittedWhenOnlyMaxExceedsLimit() assertThat(formatStats.isSetMax()).isFalse(); assertThat(formatStats.isSetMax_value()).isFalse(); } + + @Test + void testGeometryLogicalTypeRoundTrip() + { + LogicalType logicalType = convertToLogicalType(geometryType("EPSG:3857")); + + assertThat(logicalType.getGEOMETRY().getCrs()).isEqualTo("EPSG:3857"); + assertThat(getLogicalTypeAnnotation(logicalType)).isEqualTo(geometryType("EPSG:3857")); + } + + @Test + void testGeographyLogicalTypeRoundTrip() + { + LogicalType logicalType = convertToLogicalType(geographyType()); + + assertThat(logicalType.getGEOGRAPHY().getCrs()).isEqualTo("OGC:CRS84"); + assertThat(logicalType.getGEOGRAPHY().getAlgorithm().name()).isEqualTo("SPHERICAL"); + assertThat(getLogicalTypeAnnotation(logicalType)).isEqualTo(geographyType("OGC:CRS84", DEFAULT_ALGO)); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index c7014e113ceb..8f003af0b599 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -88,6 +88,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp; import static io.trino.plugin.iceberg.IcebergUtil.getParquetBloomFilterColumns; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.plugin.iceberg.util.IcebergParquetSchemaConverter.convert; import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType; import static io.trino.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -95,7 +96,6 @@ import static java.util.Objects.requireNonNull; import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE; import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema; -import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; public class IcebergFileWriterFactory { @@ -201,7 +201,7 @@ private IcebergFileWriter createParquetWriter( rollbackAction, fileColumnTypes, fileColumnNames, - convert(toFileSchema(icebergSchema), "table"), + convert(icebergSchema, "table"), makeTypeMap(fileColumnTypes, fileColumnNames), parquetWriterOptions, IntStream.range(0, fileColumnNames.size()).toArray(), diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/IcebergParquetSchemaConverter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/IcebergParquetSchemaConverter.java new file mode 100644 index 000000000000..fe269ee8ea70 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/IcebergParquetSchemaConverter.java @@ -0,0 +1,241 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.EdgeAlgorithm; +import org.apache.iceberg.types.Type.NestedType; +import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.DecimalType; +import org.apache.iceberg.types.Types.FixedType; +import org.apache.iceberg.types.Types.GeographyType; +import org.apache.iceberg.types.Types.GeometryType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampNanoType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.variants.Variant; +import org.apache.parquet.column.schema.EdgeInterpolationAlgorithm; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; + +// TODO: Remove this class once upstream Iceberg supports geometry/geography Parquet logical annotations. +public final class IcebergParquetSchemaConverter +{ + private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType(); + private static final LogicalTypeAnnotation DATE = LogicalTypeAnnotation.dateType(); + private static final LogicalTypeAnnotation TIME_MICROS = LogicalTypeAnnotation.timeType(false, TimeUnit.MICROS); + private static final LogicalTypeAnnotation TIMESTAMP_MICROS = LogicalTypeAnnotation.timestampType(false, TimeUnit.MICROS); + private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS); + private static final LogicalTypeAnnotation TIMESTAMP_NANOS = LogicalTypeAnnotation.timestampType(false, TimeUnit.NANOS); + private static final LogicalTypeAnnotation TIMESTAMPTZ_NANOS = LogicalTypeAnnotation.timestampType(true, TimeUnit.NANOS); + private static final String METADATA = "metadata"; + private static final String VALUE = "value"; + + private IcebergParquetSchemaConverter() {} + + public static MessageType convert(Schema schema, String name) + { + Types.MessageTypeBuilder builder = Types.buildMessage(); + for (NestedField field : schema.columns()) { + Type fieldType = field(field); + if (fieldType != null) { + builder.addField(fieldType); + } + } + return builder.named(AvroSchemaUtil.makeCompatibleName(name)); + } + + private static GroupType struct(StructType struct, Type.Repetition repetition, int id, String name) + { + Types.GroupBuilder builder = Types.buildGroup(repetition); + for (NestedField field : struct.fields()) { + Type fieldType = field(field); + if (fieldType != null) { + builder.addField(fieldType); + } + } + return builder.id(id).named(AvroSchemaUtil.makeCompatibleName(name)); + } + + private static Type field(NestedField field) + { + Type.Repetition repetition = field.isOptional() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; + int id = field.fieldId(); + String name = field.name(); + + if (field.type().typeId() == TypeID.UNKNOWN) { + return null; + } + if (field.type().isPrimitiveType()) { + return primitive(field.type().asPrimitiveType(), repetition, id, name); + } + if (field.type().isVariantType()) { + return variant(repetition, id, name); + } + + NestedType nested = field.type().asNestedType(); + if (nested.isStructType()) { + return struct(nested.asStructType(), repetition, id, name); + } + if (nested.isMapType()) { + return map(nested.asMapType(), repetition, id, name); + } + if (nested.isListType()) { + return list(nested.asListType(), repetition, id, name); + } + throw new UnsupportedOperationException("Can't convert unknown type: " + nested); + } + + private static GroupType list(ListType list, Type.Repetition repetition, int id, String name) + { + NestedField elementField = list.fields().get(0); + Type elementType = field(elementField); + checkArgument(elementType != null, "Cannot convert element Parquet: %s", elementField.type()); + + return Types.list(repetition) + .element(elementType) + .id(id) + .named(AvroSchemaUtil.makeCompatibleName(name)); + } + + private static GroupType map(MapType map, Type.Repetition repetition, int id, String name) + { + NestedField keyField = map.fields().get(0); + NestedField valueField = map.fields().get(1); + Type keyType = field(keyField); + checkArgument(keyType != null, "Cannot convert key Parquet: %s", keyField.type()); + Type valueType = field(valueField); + checkArgument(valueType != null, "Cannot convert value Parquet: %s", valueField.type()); + + return Types.map(repetition) + .key(keyType) + .value(valueType) + .id(id) + .named(AvroSchemaUtil.makeCompatibleName(name)); + } + + private static Type variant(Type.Repetition repetition, int id, String originalName) + { + String name = AvroSchemaUtil.makeCompatibleName(originalName); + return Types.buildGroup(repetition) + .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)) + .id(id) + .required(BINARY) + .named(METADATA) + .required(BINARY) + .named(VALUE) + .named(name); + } + + private static Type primitive(PrimitiveType primitive, Type.Repetition repetition, int id, String originalName) + { + String name = AvroSchemaUtil.makeCompatibleName(originalName); + return switch (primitive.typeId()) { + case BOOLEAN -> Types.primitive(BOOLEAN, repetition).id(id).named(name); + case INTEGER -> Types.primitive(INT32, repetition).id(id).named(name); + case LONG -> Types.primitive(INT64, repetition).id(id).named(name); + case FLOAT -> Types.primitive(FLOAT, repetition).id(id).named(name); + case DOUBLE -> Types.primitive(DOUBLE, repetition).id(id).named(name); + case DATE -> Types.primitive(INT32, repetition).as(DATE).id(id).named(name); + case TIME -> Types.primitive(INT64, repetition).as(TIME_MICROS).id(id).named(name); + case TIMESTAMP -> ((TimestampType) primitive).shouldAdjustToUTC() ? + Types.primitive(INT64, repetition).as(TIMESTAMPTZ_MICROS).id(id).named(name) : + Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).id(id).named(name); + case TIMESTAMP_NANO -> ((TimestampNanoType) primitive).shouldAdjustToUTC() ? + Types.primitive(INT64, repetition).as(TIMESTAMPTZ_NANOS).id(id).named(name) : + Types.primitive(INT64, repetition).as(TIMESTAMP_NANOS).id(id).named(name); + case STRING -> Types.primitive(BINARY, repetition).as(STRING).id(id).named(name); + case BINARY -> Types.primitive(BINARY, repetition).id(id).named(name); + case FIXED -> { + FixedType fixed = (FixedType) primitive; + yield Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .length(fixed.length()) + .id(id) + .named(name); + } + case DECIMAL -> { + DecimalType decimal = (DecimalType) primitive; + if (decimal.precision() <= 9) { + yield Types.primitive(INT32, repetition) + .as(LogicalTypeAnnotation.decimalType(decimal.scale(), decimal.precision())) + .id(id) + .named(name); + } + if (decimal.precision() <= 18) { + yield Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.decimalType(decimal.scale(), decimal.precision())) + .id(id) + .named(name); + } + int minLength = TypeUtil.decimalRequiredBytes(decimal.precision()); + yield Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .length(minLength) + .as(LogicalTypeAnnotation.decimalType(decimal.scale(), decimal.precision())) + .id(id) + .named(name); + } + case UUID -> Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) + .length(16) + .as(LogicalTypeAnnotation.uuidType()) + .id(id) + .named(name); + case GEOMETRY -> { + GeometryType geometryType = (GeometryType) primitive; + yield Types.primitive(BINARY, repetition) + .as(LogicalTypeAnnotation.geometryType(geometryType.crs() == null ? GeometryType.DEFAULT_CRS : geometryType.crs())) + .id(id) + .named(name); + } + case GEOGRAPHY -> { + GeographyType geographyType = (GeographyType) primitive; + yield Types.primitive(BINARY, repetition) + .as(toParquetGeographyType(geographyType)) + .id(id) + .named(name); + } + default -> throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive); + }; + } + + private static LogicalTypeAnnotation toParquetGeographyType(GeographyType geographyType) + { + String crs = geographyType.crs(); + EdgeAlgorithm algorithm = geographyType.algorithm(); + if (crs == null && algorithm == null) { + return LogicalTypeAnnotation.geographyType(); + } + return LogicalTypeAnnotation.geographyType( + crs == null ? GeographyType.DEFAULT_CRS : crs, + algorithm == null ? LogicalTypeAnnotation.DEFAULT_ALGO : EdgeInterpolationAlgorithm.valueOf(algorithm.name())); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java index 76ce25994b80..1454ce990be6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java @@ -22,6 +22,7 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.metastore.HiveMetastore; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.plugin.geospatial.GeoPlugin; import io.trino.plugin.hive.HivePlugin; import io.trino.plugin.iceberg.catalog.TrinoCatalog; @@ -79,6 +80,7 @@ import static io.trino.plugin.iceberg.IcebergTestUtils.SESSION; import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; +import static io.trino.plugin.iceberg.IcebergTestUtils.getParquetFileMetadata; import static io.trino.plugin.iceberg.IcebergTestUtils.getTrinoCatalog; import static io.trino.plugin.iceberg.IcebergUtil.getLatestMetadataLocation; import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; @@ -86,6 +88,7 @@ import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.TestingSession.testSessionBuilder; import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.keycloak.util.JsonSerialization.mapper; @@ -1643,7 +1646,7 @@ private static Path latestMetadataJson(Path tableLocation) Path metadataDir = tableLocation.resolve("metadata"); try (var stream = Files.list(metadataDir)) { return stream - .filter(path -> path.getFileName().toString().endsWith(".metadata.json")) + .filter(path -> path.getFileName().endsWith(".metadata.json")) .max(Comparator.naturalOrder()) .orElseThrow(() -> new IllegalStateException("No metadata.json found in " + metadataDir)); } @@ -1658,6 +1661,14 @@ private long getFileSize(String dataFilePath) .length(); } + private ParquetMetadata getOnlyParquetDataFileMetadata(String tableName) + { + BaseTable table = loadTable(tableName); + table.refresh(); + DataFile dataFile = getOnlyElement(table.currentSnapshot().addedDataFiles(table.io())); + return getParquetFileMetadata(fileSystemFactory.create(SESSION).newInputFile(Location.of(dataFile.location()))); + } + @Test void testGeometryTypeJsonSerialization() { @@ -1716,6 +1727,9 @@ void testGeometryWithCustomSrid() assertThat(query("SELECT ST_AsText(geom), ST_SRID(geom) FROM " + registered)) .matches("VALUES (VARCHAR 'POINT (1 2)', 3857)"); + assertThat(getOnlyParquetDataFileMetadata(registered).getFileMetaData().getSchema().getType("geom").asPrimitiveType().getLogicalTypeAnnotation()) + .isEqualTo(geometryType("EPSG:3857")); + assertUpdate("DROP TABLE " + registered); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestIcebergParquetSchemaConverter.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestIcebergParquetSchemaConverter.java new file mode 100644 index 000000000000..46b5e6e8b406 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/TestIcebergParquetSchemaConverter.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.jupiter.api.Test; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.geographyType; +import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.assertj.core.api.Assertions.assertThat; + +class TestIcebergParquetSchemaConverter +{ + @Test + void testGeospatialLogicalAnnotations() + { + Schema schema = new Schema( + Types.NestedField.optional(1, "geom_default", Types.GeometryType.crs84()), + Types.NestedField.optional(2, "geom_3857", Types.GeometryType.of("EPSG:3857")), + Types.NestedField.optional(3, "geog_default", Types.GeographyType.crs84()), + Types.NestedField.optional(4, "row_payload", Types.StructType.of( + Types.NestedField.optional(5, "geom", Types.GeometryType.crs84()))), + Types.NestedField.optional(6, "array_payload", Types.ListType.ofOptional(7, Types.GeometryType.of("EPSG:3857"))), + Types.NestedField.optional(8, "map_payload", Types.MapType.ofOptional(9, 10, Types.StringType.get(), Types.GeographyType.crs84()))); + + MessageType messageType = IcebergParquetSchemaConverter.convert(schema, "table"); + + assertGeometry(messageType.getType("geom_default").asPrimitiveType(), geometryType("OGC:CRS84")); + assertGeometry(messageType.getType("geom_3857").asPrimitiveType(), geometryType("EPSG:3857")); + assertGeography(messageType.getType("geog_default").asPrimitiveType(), geographyType()); + assertGeometry(messageType.getType("row_payload").asGroupType().getType("geom").asPrimitiveType(), geometryType("OGC:CRS84")); + assertGeometry(messageType.getType("array_payload").asGroupType().getType("list").asGroupType().getType("element").asPrimitiveType(), geometryType("EPSG:3857")); + assertGeography(messageType.getType("map_payload").asGroupType().getType("key_value").asGroupType().getType("value").asPrimitiveType(), geographyType()); + } + + private static void assertGeometry(PrimitiveType primitiveType, LogicalTypeAnnotation expectedAnnotation) + { + assertThat(primitiveType.getPrimitiveTypeName()).isEqualTo(BINARY); + assertThat(primitiveType.getLogicalTypeAnnotation()).isEqualTo(expectedAnnotation); + } + + private static void assertGeography(PrimitiveType primitiveType, LogicalTypeAnnotation expectedAnnotation) + { + assertThat(primitiveType.getPrimitiveTypeName()).isEqualTo(BINARY); + assertThat(primitiveType.getLogicalTypeAnnotation()).isEqualTo(expectedAnnotation); + } +}