Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKBWriter;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -36,6 +38,10 @@
public final class JtsGeometrySerde
{
private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory();
public static final int OGC_CRS84_SRID = 4326;

// EWKB flag for SRID presence (bit 29)
private static final int EWKB_SRID_FLAG = 0x20000000;

// WKB type codes (2D)
private static final int WKB_POINT = 1;
Expand Down Expand Up @@ -184,4 +190,108 @@ public static Slice serializeBinaryOp(Geometry result, Geometry left, Geometry r
result.setSRID(validateAndGetSrid(left, right));
return serialize(result);
}

/**
* Extract SRID from EWKB without full parsing.
* Returns 0 if no SRID is embedded.
*/
public static int extractSrid(Slice ewkb)
{
if (ewkb.length() < 9) {
return 0;
}
boolean bigEndian = ewkb.getByte(0) == 0;
int type = ewkb.getInt(1);
if (bigEndian) {
type = Integer.reverseBytes(type);
}
if ((type & EWKB_SRID_FLAG) == 0) {
return 0;
}
int srid = ewkb.getInt(5);
Comment on lines +204 to +211
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do a single getLong instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turned out more complex.

if (bigEndian) {
srid = Integer.reverseBytes(srid);
}
return srid;
}

/**
* Strip SRID from EWKB to produce standard WKB.
* If the input is already standard WKB (no SRID flag), returns it unchanged.
*/
public static Slice ewkbToWkb(Slice ewkb)
{
if (ewkb.length() < 9) {
return ewkb;
}
boolean bigEndian = ewkb.getByte(0) == 0;
int type = ewkb.getInt(1);
if (bigEndian) {
type = Integer.reverseBytes(type);
}
if ((type & EWKB_SRID_FLAG) == 0) {
return ewkb;
}

// Strip SRID flag and 4 SRID bytes
int newType = type & ~EWKB_SRID_FLAG;
Slice wkb = Slices.allocate(ewkb.length() - 4);
wkb.setByte(0, ewkb.getByte(0)); // endianness
wkb.setInt(1, bigEndian ? Integer.reverseBytes(newType) : newType);
wkb.setBytes(5, ewkb, 9, ewkb.length() - 9); // geometry data
return wkb;
}

/**
* Convert a CRS string to an SRID integer.
* Supports formats:
* - "EPSG:XXXX" → XXXX
* - "OGC:CRS84" or "CRS84" → 4326 (WGS84)
*/
public static int crsToSrid(String crs)
{
if (crs == null || crs.isEmpty()) {
return 0;
}
String upperCrs = crs.toUpperCase(ENGLISH);
if (upperCrs.equals("OGC:CRS84") || upperCrs.equals("CRS84")) {
return OGC_CRS84_SRID; // WGS84
}
if (upperCrs.startsWith("EPSG:")) {
try {
int srid = Integer.parseInt(crs.substring(5));
if (srid <= 0) {
throw new IllegalArgumentException("Invalid EPSG code: " + crs);
}
return srid;
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid EPSG code: " + crs);
}
}
throw new IllegalArgumentException("Unsupported CRS format: " + crs);
}
Comment thread
dain marked this conversation as resolved.

/**
* Inject SRID into WKB to produce EWKB.
*/
public static Slice wkbToEwkb(Slice wkb, int srid)
{
checkArgument(wkb.length() >= 5, "WKB too short");
boolean bigEndian = wkb.getByte(0) == 0;
int type = wkb.getInt(1);
if (bigEndian) {
type = Integer.reverseBytes(type);
}
checkArgument((type & EWKB_SRID_FLAG) == 0, "Input already has SRID flag set (expected WKB, got EWKB)");

// Add SRID flag
int newType = type | EWKB_SRID_FLAG;
Slice ewkb = Slices.allocate(wkb.length() + 4);
ewkb.setByte(0, wkb.getByte(0)); // endianness
ewkb.setInt(1, bigEndian ? Integer.reverseBytes(newType) : newType);
ewkb.setInt(5, bigEndian ? Integer.reverseBytes(srid) : srid);
ewkb.setBytes(9, wkb, 5, wkb.length() - 5); // geometry data
return ewkb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import static io.trino.geospatial.GeometryType.MULTI_POLYGON;
import static io.trino.geospatial.GeometryType.POINT;
import static io.trino.geospatial.GeometryType.POLYGON;
import static io.trino.geospatial.serde.JtsGeometrySerde.crsToSrid;
import static io.trino.geospatial.serde.JtsGeometrySerde.deserialize;
import static io.trino.geospatial.serde.JtsGeometrySerde.deserializeEnvelope;
import static io.trino.geospatial.serde.JtsGeometrySerde.deserializeType;
import static io.trino.geospatial.serde.JtsGeometrySerde.ewkbToWkb;
import static io.trino.geospatial.serde.JtsGeometrySerde.serialize;
import static io.trino.geospatial.serde.JtsGeometrySerde.wkbToEwkb;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -145,6 +148,31 @@ public void testGeometryCollectionSridRoundTrip()
testSerializationWithSrid("GEOMETRYCOLLECTION (POINT (1 2), LINESTRING (0 0, 1 2, 3 4))", 3857);
}

@Test
public void testCrsToSridRejectsNonPositiveEpsgCodes()
{
assertThat(crsToSrid("EPSG:3857")).isEqualTo(3857);
assertThatThrownBy(() -> crsToSrid("EPSG:0"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid EPSG code: EPSG:0");
assertThatThrownBy(() -> crsToSrid("EPSG:-3857"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid EPSG code: EPSG:-3857");
}

@Test
public void testWkbToEwkbRejectsEwkbInput()
{
Geometry geometry = createJtsGeometry("POINT (1 2)");
geometry.setSRID(3857);
Slice ewkb = serialize(geometry);

assertThat(ewkbToWkb(ewkb)).isNotEqualTo(ewkb);
assertThatThrownBy(() -> wkbToEwkb(ewkb, 3857))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Input already has SRID flag set (expected WKB, got EWKB)");
}

@Test
public void testEnvelope()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.parquet.format.DecimalType;
import org.apache.parquet.format.Encoding;
import org.apache.parquet.format.EnumType;
import org.apache.parquet.format.GeographyType;
import org.apache.parquet.format.GeometryType;
import org.apache.parquet.format.IntType;
import org.apache.parquet.format.JsonType;
import org.apache.parquet.format.ListType;
Expand Down Expand Up @@ -59,6 +61,7 @@
import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNullElse;
import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
import static org.apache.parquet.schema.LogicalTypeAnnotation.BsonLogicalTypeAnnotation;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
Expand Down Expand Up @@ -160,8 +163,18 @@ public static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type)
case UUID -> uuidType();
case FLOAT16 -> float16Type();
case VARIANT -> variantType((byte) 1);
case GEOMETRY -> geometryType("OGC:CRS84");
case GEOGRAPHY -> geographyType();
case GEOMETRY -> {
GeometryType geometry = type.getGEOMETRY();
yield geometryType(geometry.isSetCrs() ? geometry.getCrs() : LogicalTypeAnnotation.DEFAULT_CRS);
}
case GEOGRAPHY -> {
GeographyType geography = type.getGEOGRAPHY();
yield geographyType(
geography.isSetCrs() ? geography.getCrs() : LogicalTypeAnnotation.DEFAULT_CRS,
geography.isSetAlgorithm() ?
org.apache.parquet.column.schema.EdgeInterpolationAlgorithm.valueOf(geography.getAlgorithm().name()) :
LogicalTypeAnnotation.DEFAULT_ALGO);
}
};
}

Expand Down Expand Up @@ -496,6 +509,24 @@ public Optional<LogicalType> visit(IntervalLogicalTypeAnnotation type)
return Optional.of(LogicalType.UNKNOWN(new NullType()));
}

@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.GeometryLogicalTypeAnnotation type)
{
GeometryType geometry = new GeometryType();
geometry.setCrs(requireNonNullElse(type.getCrs(), LogicalTypeAnnotation.DEFAULT_CRS));
return Optional.of(LogicalType.GEOMETRY(geometry));
}

@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.GeographyLogicalTypeAnnotation type)
{
GeographyType geography = new GeographyType();
geography.setCrs(requireNonNullElse(type.getCrs(), LogicalTypeAnnotation.DEFAULT_CRS));
org.apache.parquet.column.schema.EdgeInterpolationAlgorithm algorithm = requireNonNullElse(type.getAlgorithm(), LogicalTypeAnnotation.DEFAULT_ALGO);
geography.setAlgorithm(org.apache.parquet.format.EdgeInterpolationAlgorithm.valueOf(algorithm.name()));
return Optional.of(LogicalType.GEOGRAPHY(geography));
}

static TimeUnit convertUnit(LogicalTypeAnnotation.TimeUnit unit)
{
return switch (unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
package io.trino.parquet;

import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.format.LogicalType;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.io.api.Binary;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static io.trino.parquet.ParquetMetadataConverter.convertToLogicalType;
import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation;
import static io.trino.parquet.ParquetMetadataConverter.toParquetStatistics;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DEFAULT_ALGO;
import static org.apache.parquet.schema.LogicalTypeAnnotation.geographyType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType;
import static org.assertj.core.api.Assertions.assertThat;

class TestParquetMetadataConverter
Expand Down Expand Up @@ -162,4 +168,23 @@ void testOnlyMaxOmittedWhenOnlyMaxExceedsLimit()
assertThat(formatStats.isSetMax()).isFalse();
assertThat(formatStats.isSetMax_value()).isFalse();
}

@Test
void testGeometryLogicalTypeRoundTrip()
{
LogicalType logicalType = convertToLogicalType(geometryType("EPSG:3857"));

assertThat(logicalType.getGEOMETRY().getCrs()).isEqualTo("EPSG:3857");
assertThat(getLogicalTypeAnnotation(logicalType)).isEqualTo(geometryType("EPSG:3857"));
}

@Test
void testGeographyLogicalTypeRoundTrip()
{
LogicalType logicalType = convertToLogicalType(geographyType());

assertThat(logicalType.getGEOGRAPHY().getCrs()).isEqualTo("OGC:CRS84");
assertThat(logicalType.getGEOGRAPHY().getAlgorithm().name()).isEqualTo("SPHERICAL");
assertThat(getLogicalTypeAnnotation(logicalType)).isEqualTo(geographyType("OGC:CRS84", DEFAULT_ALGO));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import static com.google.common.io.Resources.getResource;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.geospatial.serde.JtsGeometrySerde.serialize;
import static io.trino.plugin.geospatial.GeoTestUtils.spatiallyEquals;
import static io.trino.plugin.geospatial.GeometryType.GEOMETRY;
import static io.trino.plugin.geospatial.SphericalGeographyType.SPHERICAL_GEOGRAPHY;
import static io.trino.spi.type.DoubleType.DOUBLE;
Expand Down Expand Up @@ -91,11 +90,7 @@ public void testGetObjectValue()
}
Block block = builder.build();
for (int i = 0; i < wktList.size(); i++) {
String expected = wktList.get(i);
String actual = (String) SPHERICAL_GEOGRAPHY.getObjectValue(block, i);
assertThat(spatiallyEquals(expected, actual))
.withFailMessage("Geometry mismatch at index %d!\nExpected: %s\nActual: %s", i, expected, actual)
.isTrue();
assertThat(wktList.get(i)).isEqualTo(SPHERICAL_GEOGRAPHY.getObjectValue(block, i));
}
}

Expand Down
11 changes: 11 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-geospatial-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive</artifactId>
Expand Down Expand Up @@ -566,6 +571,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-geospatial</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hdfs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.plugin.hive.util.HiveUtil.isStructuralType;
import static io.trino.plugin.iceberg.GeoSpatialUtils.isGeospatialType;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.util.Timestamps.compareTimestampNanosToRange;
import static io.trino.plugin.iceberg.util.Timestamps.compareTimestampTzNanosToRange;
Expand Down Expand Up @@ -98,6 +99,11 @@ public static boolean isConvertibleToIcebergExpression(Domain domain)
return domain.isOnlyNull() || domain.getValues().isAll();
}

// Geometry and Geography types are not supported for predicate pushdown in Iceberg
if (isGeospatialType(domain.getType())) {
return false;
}

if (domain.getType() == UUID) {
// Iceberg orders UUID values differently than Trino (perhaps due to https://bugs.openjdk.org/browse/JDK-7025832), so allow only IS NULL / IS NOT NULL checks
return domain.isOnlyNull() || domain.getValues().isAll();
Expand Down
Loading
Loading