diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index ae95980286c1..a14b58daea07 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.slice.Slice; import io.airlift.slice.SliceUtf8; +import io.airlift.slice.Slices; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; @@ -54,6 +55,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Locale; import java.util.Map; @@ -98,7 +100,6 @@ import static java.lang.Float.parseFloat; import static java.lang.Long.parseLong; import static java.lang.String.format; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.LocationProviders.locationsFor; @@ -290,7 +291,7 @@ public static Object deserializePartitionValue(Type type, String valueString, St return value; } if (type.equals(VarbinaryType.VARBINARY)) { - return utf8Slice(valueString); + return Slices.wrappedBuffer(Base64.getDecoder().decode(valueString)); } if (type.equals(UuidType.UUID)) { return javaUuidToTrinoUuid(UUID.fromString(valueString)); @@ -341,7 +342,7 @@ public static Map> getPartitionKeys(FileScanTask scanT String partitionValue; if (type.typeId() == FIXED || type.typeId() == BINARY) { // this is safe because Iceberg PartitionData directly wraps the byte array - partitionValue = new String(((ByteBuffer) value).array(), UTF_8); + partitionValue = Base64.getEncoder().encodeToString(((ByteBuffer) value).array()); } else { partitionValue = value.toString(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index f73d9e0bf42d..3d4283ca83e5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -643,7 +643,7 @@ public void testCreatePartitionedTable() " 'a_short_decimal', " + " 'a_long_decimal', " + " 'a_varchar', " + - // " 'a_varbinary', " + TODO (https://github.com/trinodb/trino/issues/9755) this yields incorrect query results + " 'a_varbinary', " + " 'a_date', " + " 'a_time', " + " 'a_timestamp', " + @@ -812,7 +812,7 @@ public void testCreatePartitionedTable() String schema = getSession().getSchema().orElseThrow(); assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'test_partitioned_table$partitions' ")) .skippingTypesCheck() - .matches("VALUES 'partition', 'record_count', 'file_count', 'total_size', 'data'"); + .matches("VALUES 'partition', 'record_count', 'file_count', 'total_size'"); assertThat(query("SELECT " + " record_count," + " file_count, " + @@ -824,7 +824,7 @@ public void testCreatePartitionedTable() " partition.a_short_decimal, " + " partition.a_long_decimal, " + " partition.a_varchar, " + - " data.a_varbinary, " + // TODO (https://github.com/trinodb/trino/issues/9755) partition on varbinary + " partition.a_varbinary, " + " partition.a_date, " + " partition.a_time, " + " partition.a_timestamp, " + @@ -844,10 +844,7 @@ public void testCreatePartitionedTable() " CAST(1.0 AS decimal(5,2)), " + " CAST(11.0 AS decimal(38,20)), " + " VARCHAR 'onefsadfdsf', " + - // TODO (https://github.com/trinodb/trino/issues/9755) include in partitioning - (format == ORC - ? " CAST(ROW(NULL, NULL, 0) AS ROW(min varbinary, max varbinary, null_count bigint)), " - : " CAST(ROW(X'000102f0feff', X'000102f0feff', 0) AS ROW(min varbinary, max varbinary, null_count bigint)), ") + + " X'000102f0feff', " + " DATE '2021-07-24'," + " TIME '02:43:57.987654', " + " TIMESTAMP '2021-07-24 03:43:57.987654'," + @@ -866,10 +863,7 @@ public void testCreatePartitionedTable() " NULL, " + " NULL, " + " NULL, " + - // TODO (https://github.com/trinodb/trino/issues/9755) include in partitioning - (format == ORC - ? " NULL, " - : " CAST(ROW(NULL, NULL, 1) AS ROW(min varbinary, max varbinary, null_count bigint)), ") + + " NULL, " + " NULL, " + " NULL, " + " NULL, " + diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index b2c2c6ccc1b2..ca36bf3bbf7d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -349,16 +349,26 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) String baseTableName = "test_spark_reads_trino_partitioned_table_" + storageFormat; String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); - onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat)); - onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", trinoTableName)); + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _varbinary VARBINARY, _bigint BIGINT) WITH (partitioning = ARRAY['_string', '_varbinary'], format = '%s')", trinoTableName, storageFormat)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", trinoTableName)); + + Row row1 = row("b", new byte[]{15, -15, 2, -16, -2, -2}, 1002); + String selectByString = "SELECT * FROM %s WHERE _string = 'b'"; + assertThat(onTrino().executeQuery(format(selectByString, trinoTableName))) + .containsOnly(row1); + assertThat(onSpark().executeQuery(format(selectByString, sparkTableName))) + .containsOnly(row1); + + Row row2 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001); + String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102f0feff'"; + assertThat(onTrino().executeQuery(format(selectByVarbinary, trinoTableName))) + .containsOnly(row2); + // for now this fails on spark see https://github.com/apache/iceberg/issues/2934 + assertQueryFailure(() -> onSpark().executeQuery(format(selectByVarbinary, sparkTableName))) + .hasMessageContaining("Cannot convert bytes to SQL literal: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]"); - Row row = row("b", 1002); - String select = "SELECT * FROM %s WHERE _string = 'b'"; - assertThat(onTrino().executeQuery(format(select, trinoTableName))) - .containsOnly(row); - assertThat(onSpark().executeQuery(format(select, sparkTableName))) - .containsOnly(row); onTrino().executeQuery("DROP TABLE " + trinoTableName); } @@ -368,20 +378,29 @@ public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int String baseTableName = "test_trino_reads_spark_partitioned_table_" + storageFormat; String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); + onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); onSpark().executeQuery(format( - "CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)", + "CREATE TABLE %s (_string STRING, _varbinary BINARY, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string, _varbinary) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)", sparkTableName, storageFormat, specVersion)); - onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", sparkTableName)); - Row row = row("b", 1002); - String select = "SELECT * FROM %s WHERE _string = 'b'"; + Row row1 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001); + String select = "SELECT * FROM %s WHERE _string = 'a'"; assertThat(onSpark().executeQuery(format(select, sparkTableName))) - .containsOnly(row); + .containsOnly(row1); assertThat(onTrino().executeQuery(format(select, trinoTableName))) - .containsOnly(row); + .containsOnly(row1); + + Row row2 = row("c", new byte[]{15, -15, 2, -3, -2, -1}, 1003); + String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102fdfeff'"; + assertThat(onTrino().executeQuery(format(selectByVarbinary, trinoTableName))) + .containsOnly(row2); + // for now this fails on spark see https://github.com/apache/iceberg/issues/2934 + assertQueryFailure(() -> onSpark().executeQuery(format(selectByVarbinary, sparkTableName))) + .hasMessageContaining("Cannot convert bytes to SQL literal: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]"); onSpark().executeQuery("DROP TABLE " + sparkTableName); }