Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -98,7 +100,6 @@
import static java.lang.Float.parseFloat;
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.LocationProviders.locationsFor;
Expand Down Expand Up @@ -290,7 +291,7 @@ public static Object deserializePartitionValue(Type type, String valueString, St
return value;
}
if (type.equals(VarbinaryType.VARBINARY)) {
return utf8Slice(valueString);
return Slices.wrappedBuffer(Base64.getDecoder().decode(valueString));
}
if (type.equals(UuidType.UUID)) {
return javaUuidToTrinoUuid(UUID.fromString(valueString));
Expand Down Expand Up @@ -341,7 +342,7 @@ public static Map<Integer, Optional<String>> getPartitionKeys(FileScanTask scanT
String partitionValue;
if (type.typeId() == FIXED || type.typeId() == BINARY) {
// this is safe because Iceberg PartitionData directly wraps the byte array
partitionValue = new String(((ByteBuffer) value).array(), UTF_8);
partitionValue = Base64.getEncoder().encodeToString(((ByteBuffer) value).array());
}
else {
partitionValue = value.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ public void testCreatePartitionedTable()
" 'a_short_decimal', " +
" 'a_long_decimal', " +
" 'a_varchar', " +
// " 'a_varbinary', " + TODO (https://github.com/trinodb/trino/issues/9755) this yields incorrect query results
" 'a_varbinary', " +
" 'a_date', " +
" 'a_time', " +
" 'a_timestamp', " +
Expand Down Expand Up @@ -812,7 +812,7 @@ public void testCreatePartitionedTable()
String schema = getSession().getSchema().orElseThrow();
assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'test_partitioned_table$partitions' "))
.skippingTypesCheck()
.matches("VALUES 'partition', 'record_count', 'file_count', 'total_size', 'data'");
.matches("VALUES 'partition', 'record_count', 'file_count', 'total_size'");
assertThat(query("SELECT " +
" record_count," +
" file_count, " +
Expand All @@ -824,7 +824,7 @@ public void testCreatePartitionedTable()
" partition.a_short_decimal, " +
" partition.a_long_decimal, " +
" partition.a_varchar, " +
" data.a_varbinary, " + // TODO (https://github.com/trinodb/trino/issues/9755) partition on varbinary
" partition.a_varbinary, " +
" partition.a_date, " +
" partition.a_time, " +
" partition.a_timestamp, " +
Expand All @@ -844,10 +844,7 @@ public void testCreatePartitionedTable()
" CAST(1.0 AS decimal(5,2)), " +
" CAST(11.0 AS decimal(38,20)), " +
" VARCHAR 'onefsadfdsf', " +
// TODO (https://github.com/trinodb/trino/issues/9755) include in partitioning
(format == ORC
? " CAST(ROW(NULL, NULL, 0) AS ROW(min varbinary, max varbinary, null_count bigint)), "
: " CAST(ROW(X'000102f0feff', X'000102f0feff', 0) AS ROW(min varbinary, max varbinary, null_count bigint)), ") +
" X'000102f0feff', " +
" DATE '2021-07-24'," +
" TIME '02:43:57.987654', " +
" TIMESTAMP '2021-07-24 03:43:57.987654'," +
Expand All @@ -866,10 +863,7 @@ public void testCreatePartitionedTable()
" NULL, " +
" NULL, " +
" NULL, " +
// TODO (https://github.com/trinodb/trino/issues/9755) include in partitioning
(format == ORC
? " NULL, "
: " CAST(ROW(NULL, NULL, 1) AS ROW(min varbinary, max varbinary, null_count bigint)), ") +
" NULL, " +
" NULL, " +
" NULL, " +
" NULL, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,26 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat)
String baseTableName = "test_spark_reads_trino_partitioned_table_" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", trinoTableName));
onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _varbinary VARBINARY, _bigint BIGINT) WITH (partitioning = ARRAY['_string', '_varbinary'], format = '%s')", trinoTableName, storageFormat));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", trinoTableName));

Row row1 = row("b", new byte[]{15, -15, 2, -16, -2, -2}, 1002);
String selectByString = "SELECT * FROM %s WHERE _string = 'b'";
assertThat(onTrino().executeQuery(format(selectByString, trinoTableName)))
.containsOnly(row1);
assertThat(onSpark().executeQuery(format(selectByString, sparkTableName)))
.containsOnly(row1);

Row row2 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001);
String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102f0feff'";
assertThat(onTrino().executeQuery(format(selectByVarbinary, trinoTableName)))
.containsOnly(row2);
// for now this fails on spark see https://github.com/apache/iceberg/issues/2934
assertQueryFailure(() -> onSpark().executeQuery(format(selectByVarbinary, sparkTableName)))
.hasMessageContaining("Cannot convert bytes to SQL literal: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]");

Row row = row("b", 1002);
String select = "SELECT * FROM %s WHERE _string = 'b'";
assertThat(onTrino().executeQuery(format(select, trinoTableName)))
.containsOnly(row);
assertThat(onSpark().executeQuery(format(select, sparkTableName)))
.containsOnly(row);
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

Expand All @@ -368,20 +378,29 @@ public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int
String baseTableName = "test_trino_reads_spark_partitioned_table_" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName);

onSpark().executeQuery(format(
"CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)",
"CREATE TABLE %s (_string STRING, _varbinary BINARY, _bigint BIGINT) USING ICEBERG PARTITIONED BY (_string, _varbinary) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = %s)",
sparkTableName,
storageFormat,
specVersion));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001), ('b', 1002), ('c', 1003)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", sparkTableName));

Row row = row("b", 1002);
String select = "SELECT * FROM %s WHERE _string = 'b'";
Row row1 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001);
String select = "SELECT * FROM %s WHERE _string = 'a'";
assertThat(onSpark().executeQuery(format(select, sparkTableName)))
.containsOnly(row);
.containsOnly(row1);
assertThat(onTrino().executeQuery(format(select, trinoTableName)))
.containsOnly(row);
.containsOnly(row1);

Row row2 = row("c", new byte[]{15, -15, 2, -3, -2, -1}, 1003);
String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102fdfeff'";
assertThat(onTrino().executeQuery(format(selectByVarbinary, trinoTableName)))
.containsOnly(row2);
// for now this fails on spark see https://github.com/apache/iceberg/issues/2934
assertQueryFailure(() -> onSpark().executeQuery(format(selectByVarbinary, sparkTableName)))
.hasMessageContaining("Cannot convert bytes to SQL literal: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]");

onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
Expand Down