diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java index 8a80ef5c7348..3d66bf4730d2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java @@ -148,9 +148,13 @@ public static ColumnIdentity primitiveColumnIdentity(int id, String name) } public static ColumnIdentity createColumnIdentity(Types.NestedField column) + { + return createColumnIdentity(column.name(), column); + } + + public static ColumnIdentity createColumnIdentity(String name, Types.NestedField column) { int id = column.fieldId(); - String name = column.name(); org.apache.iceberg.types.Type fieldType = column.type(); if (!fieldType.isNestedType()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java index d9d68ee247fd..6205233e11fe 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java @@ -20,11 +20,15 @@ import com.google.common.collect.Iterables; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.types.Types; import java.util.List; import java.util.Objects; import java.util.Optional; +import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static java.util.Objects.requireNonNull; public class IcebergColumnHandle @@ -166,4 +170,24 @@ public String toString() { return getId() + ":" + getName() + ":" + type.getDisplayName(); } + + public static IcebergColumnHandle create(Types.NestedField column, TypeManager typeManager) + { + return new IcebergColumnHandle( + createColumnIdentity(column), + toTrinoType(column.type(), typeManager), + ImmutableList.of(), + toTrinoType(column.type(), typeManager), + Optional.ofNullable(column.doc())); + } + + public static IcebergColumnHandle create(String name, Types.NestedField column, TypeManager typeManager) + { + return new IcebergColumnHandle( + createColumnIdentity(name, column), + toTrinoType(column.type(), typeManager), + ImmutableList.of(), + toTrinoType(column.type(), typeManager), + Optional.ofNullable(column.doc())); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index fb4cae303f23..42f08a746f59 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -109,7 +109,7 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; -import static io.trino.plugin.iceberg.IcebergUtil.getColumns; +import static io.trino.plugin.iceberg.IcebergUtil.getAllColumns; import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; @@ -263,7 +263,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con DiscretePredicates discretePredicates = null; if (!partitionSourceIds.isEmpty()) { // Extract identity partition columns - Map columns = getColumns(icebergTable.schema(), typeManager).stream() + Map columns = getAllColumns(icebergTable.schema(), typeManager).stream() .filter(column -> partitionSourceIds.contains(column.getId())) .collect(toImmutableMap(IcebergColumnHandle::getId, Function.identity())); @@ -340,7 +340,7 @@ public Map getColumnHandles(ConnectorSession session, Conn { IcebergTableHandle table = (IcebergTableHandle) tableHandle; Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - return getColumns(icebergTable.schema(), typeManager).stream() + return getAllColumns(icebergTable.schema(), typeManager).stream() .collect(toImmutableMap(IcebergColumnHandle::getName, identity())); } @@ -432,7 +432,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con tableMetadata.getTable().getTableName(), SchemaParser.toJson(transaction.table().schema()), PartitionSpecParser.toJson(transaction.table().spec()), - getColumns(transaction.table().schema(), typeManager), + getAllColumns(transaction.table().schema(), typeManager), transaction.table().location(), getFileFormat(transaction.table()), transaction.table().properties()); @@ -458,7 +458,7 @@ private Optional getWriteLayout(Schema tableSchema, Par return Optional.empty(); } - Map columnById = getColumns(tableSchema, typeManager).stream() + Map columnById = getAllColumns(tableSchema, typeManager).stream() .collect(toImmutableMap(IcebergColumnHandle::getId, identity())); List partitioningColumns = partitionSpec.fields().stream() @@ -492,7 +492,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto table.getTableName(), SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), - getColumns(icebergTable.schema(), typeManager), + getAllColumns(icebergTable.schema(), typeManager), icebergTable.location(), getFileFormat(icebergTable), icebergTable.properties()); @@ -914,7 +914,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession table.getTableName(), SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), - getColumns(icebergTable.schema(), typeManager), + getAllColumns(icebergTable.schema(), typeManager), icebergTable.location(), getFileFormat(icebergTable), icebergTable.properties()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 6d6cd4cc903a..c10aa3f8a232 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -47,11 +47,14 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.Types; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -66,6 +69,7 @@ import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform; import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.block.ColumnarRow.toColumnarRow; import static io.trino.spi.type.Decimals.readBigDecimal; import static io.trino.spi.type.TimeType.TIME_MICROS; import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; @@ -129,7 +133,7 @@ public IcebergPageSink( this.session = requireNonNull(session, "session is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.maxOpenWriters = maxOpenWriters; - this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); + this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec, outputSchema)); } @Override @@ -326,7 +330,7 @@ private static Optional getPartitionData(List co Object[] values = new Object[columns.size()]; for (int i = 0; i < columns.size(); i++) { PartitionColumn column = columns.get(i); - Block block = page.getBlock(column.getSourceChannel()); + Block block = getPartitionBlock(column, page); Type type = column.getSourceType(); Object value = getIcebergValue(block, position, type); values[i] = applyTransform(column.getField().transform(), value); @@ -334,6 +338,16 @@ private static Optional getPartitionData(List co return Optional.of(new PartitionData(values)); } + private static Block getPartitionBlock(PartitionColumn column, Page page) + { + Iterator pos = column.getSourceChannels().listIterator(); + Block block = page.getBlock(pos.next()); + while (pos.hasNext()) { + block = toColumnarRow(block).getField(pos.next()); + } + return block; + } + @SuppressWarnings("unchecked") private static Object applyTransform(Transform transform, Object value) { @@ -384,7 +398,7 @@ public static Object getIcebergValue(Block block, int position, Type type) throw new UnsupportedOperationException("Type not supported as partition column: " + type.getDisplayName()); } - private static List toPartitionColumns(List handles, PartitionSpec partitionSpec) + private static List toPartitionColumns(List handles, PartitionSpec partitionSpec, Schema schema) { Map idChannels = new HashMap<>(); for (int i = 0; i < handles.size(); i++) { @@ -392,16 +406,24 @@ private static List toPartitionColumns(List { - Integer channel = idChannels.get(field.sourceId()); - checkArgument(channel != null, "partition field not found: %s", field); - Type inputType = handles.get(channel).getType(); - ColumnTransform transform = getColumnTransform(field, inputType); - return new PartitionColumn(field, channel, inputType, transform.getType(), transform.getBlockTransform()); - }) + .map(field -> getPartitionColumn(field, handles, schema.asStruct(), idChannels)) .collect(toImmutableList()); } + private static PartitionColumn getPartitionColumn(PartitionField field, List handles, Types.StructType schema, Map idChannels) + { + List sourceIds = null; + try { + sourceIds = IcebergUtil.getIndexPathToField(schema, field.sourceId()); + } + catch (Exception e) { + checkArgument(sourceIds != null, "partition field not found: %s", field); + } + Type inputType = handles.get(idChannels.get(field.sourceId())).getType(); + ColumnTransform transform = getColumnTransform(field, inputType); + return new PartitionColumn(field, sourceIds, inputType, transform.getType(), transform.getBlockTransform()); + } + private static class WriteContext { private final IcebergFileWriter writer; @@ -449,7 +471,7 @@ public int[] partitionPage(Page page) Block[] blocks = new Block[columns.size()]; for (int i = 0; i < columns.size(); i++) { PartitionColumn column = columns.get(i); - Block block = page.getBlock(column.getSourceChannel()); + Block block = getPartitionBlock(column, page); blocks[i] = column.getBlockTransform().apply(block); } Page transformed = new Page(page.getPositionCount(), blocks); @@ -457,6 +479,18 @@ public int[] partitionPage(Page page) return pageIndexer.indexPage(transformed); } + private Block getPartitionBlock(PartitionColumn column, Page page) + { + ListIterator iterator = column.getSourceChannels().listIterator(); + Block block = page.getBlock(iterator.next()); + + while (iterator.hasNext()) { + block = toColumnarRow(block).getField(iterator.next()); + } + + return block; + } + public int getMaxIndex() { return pageIndexer.getMaxIndex(); @@ -471,15 +505,15 @@ public List getColumns() private static class PartitionColumn { private final PartitionField field; - private final int sourceChannel; + private final List sourceChannels; private final Type sourceType; private final Type resultType; private final Function blockTransform; - public PartitionColumn(PartitionField field, int sourceChannel, Type sourceType, Type resultType, Function blockTransform) + public PartitionColumn(PartitionField field, List sourceChannels, Type sourceType, Type resultType, Function blockTransform) { this.field = requireNonNull(field, "field is null"); - this.sourceChannel = sourceChannel; + this.sourceChannels = sourceChannels; this.sourceType = requireNonNull(sourceType, "sourceType is null"); this.resultType = requireNonNull(resultType, "resultType is null"); this.blockTransform = requireNonNull(blockTransform, "blockTransform is null"); @@ -492,7 +526,12 @@ public PartitionField getField() public int getSourceChannel() { - return sourceChannel; + return sourceChannels.get(0); + } + + public List getSourceChannels() + { + return sourceChannels; } public Type getSourceType() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 05e372d0fe32..d1412f5119a0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -48,6 +48,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -56,6 +57,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Base64; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -150,6 +152,72 @@ public static Table getIcebergTableWithMetadata( return new BaseTable(operations, quotedTableName(table)); } + public static List getAllColumns(Schema schema, TypeManager typeManager) + { + return TypeUtil + .indexByName(schema.asStruct()) + .keySet() + .stream() + .map(name -> IcebergColumnHandle.create(name, schema.findField(name), typeManager)) + .collect(toImmutableList()); + } + + public static List getNestedColumnNames(Types.StructType schema, Integer sourceId) + { + Map parentIndex = TypeUtil.indexParents(schema); + Map idIndex = TypeUtil.indexById(schema); + LinkedList parentColumns = new LinkedList(); + + parentColumns.addFirst(idIndex.get(sourceId).name()); + Integer current = parentIndex.get(sourceId); + + while (current != null) { + parentColumns.addFirst(idIndex.get(current).name()); + current = parentIndex.get(current); + } + return parentColumns; + } + + private static Integer getFieldPosFromSchema(String name, Types.StructType schema) throws Exception + { + for (int i = 0; i < schema.fields().size(); i++) { + if (schema.fields().get(i).name().contentEquals(name)) { + return i; + } + } + throw new IllegalArgumentException("Could not find field " + name + " in schema"); + } + + public static List getIndexPathToField(Types.StructType schema, Integer sourceId) throws Exception + { + return getIndexPathToField(schema, getNestedColumnNames(schema, sourceId)); + } + + public static List getIndexPathToField(Types.StructType schema, List fieldName) throws Exception + { + List sourceIds = new LinkedList<>(); + Types.StructType current = schema; + + // Iterate over field names while finding position in schema + for (int i = 0; i < fieldName.size(); i++) { + String name = fieldName.get(i); + sourceIds.add(getFieldPosFromSchema(name, current)); + + if (current.field(name).type().isStructType()) { + current = current.field(name).type().asStructType(); + } + else if (i + 1 == fieldName.size()) { + break; + } + else { + String fullFieldName = String.join(".", fieldName); + throw new IllegalArgumentException("Could not find field " + fullFieldName + " in schema"); + } + } + + return sourceIds; + } + public static long resolveSnapshotId(Table table, long snapshotId) { if (table.snapshot(snapshotId) != null) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionFields.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionFields.java index 164c87056060..e54fac7b412b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionFields.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionFields.java @@ -29,7 +29,8 @@ public final class PartitionFields { - private static final String NAME = "[a-z_][a-z0-9_]*"; + private static final String IDENTIFIER = "[a-z_][a-z0-9_]*"; + private static final String NAME = IDENTIFIER + "(\\." + IDENTIFIER + ")*"; private static final String FUNCTION_ARGUMENT_NAME = "\\((" + NAME + ")\\)"; private static final String FUNCTION_ARGUMENT_NAME_AND_INT = "\\((" + NAME + "), *(\\d+)\\)"; @@ -65,8 +66,8 @@ public static void parsePartitionField(PartitionSpec.Builder builder, String fie tryMatch(field, MONTH_PATTERN, match -> builder.month(match.group(1))) || tryMatch(field, DAY_PATTERN, match -> builder.day(match.group(1))) || tryMatch(field, HOUR_PATTERN, match -> builder.hour(match.group(1))) || - tryMatch(field, BUCKET_PATTERN, match -> builder.bucket(match.group(1), parseInt(match.group(2)))) || - tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(2)))) || + tryMatch(field, BUCKET_PATTERN, match -> builder.bucket(match.group(1), parseInt(match.group(match.groupCount())))) || + tryMatch(field, TRUNCATE_PATTERN, match -> builder.truncate(match.group(1), parseInt(match.group(match.groupCount())))) || tryMatch(field, VOID_PATTERN, match -> builder.alwaysNull(match.group(1))) || false; if (!matched) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 6456c8e06aba..e20da654d8af 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -2368,6 +2368,40 @@ public void testCreateNestedPartitionedTable() dropTable("test_nested_table_3"); } + @Test + public void testCreateWithNestedPartitionedColTable() + { + // Test INSERT with SELECT values + @Language("SQL") String createTable = + "CREATE TABLE test_nested_partition_table_1 (int INTEGER, str ROW(id INTEGER , vc VARCHAR)) WITH (partitioning = ARRAY['str.id'])"; + assertUpdate(createTable); + @Language("SQL") String insertSql = "INSERT INTO test_nested_partition_table_1 select 1, (CAST(ROW(1, 'this is a random value') AS ROW(int, varchar)))"; + assertUpdate(insertSql, 1); + assertThat(query("SELECT int, str.id, str.vc FROM test_nested_partition_table_1")) + .matches("VALUES (1, 1, CAST('this is a random value' as varchar))"); + + // Test INSERT VALUES + @Language("SQL") String createTable2 = + "CREATE TABLE test_nested_partition_table_2 (int INTEGER, str ROW(id INTEGER , vc VARCHAR)) WITH (partitioning = ARRAY['str.id'])"; + assertUpdate(createTable2); + @Language("SQL") String insertSql2 = "INSERT INTO test_nested_partition_table_2 VALUES (1, ROW(1, 'this is a random value'))"; + assertUpdate(insertSql2, 1); + assertThat(query("SELECT int, str.id, str.vc FROM test_nested_partition_table_2")) + .matches("VALUES (1, 1, CAST('this is a random value' as varchar))"); + + // Test insert with SELECT FROM TABLE + @Language("SQL") String createTable3 = "" + + "CREATE TABLE test_nested_partition_table_3 WITH (partitioning = ARRAY['str.id']) AS SELECT * FROM test_nested_partition_table_1"; + assertUpdate(createTable3, 1); + assertThat(query("SELECT int, str.id, str.vc FROM test_nested_partition_table_3")) + .matches("VALUES (1, 1, CAST('this is a random value' as varchar))"); + + // Clean up + dropTable("test_nested_partition_table_1"); + dropTable("test_nested_partition_table_2"); + dropTable("test_nested_partition_table_3"); + } + @Test public void testSerializableReadIsolation() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java index 778be203b061..cf7a573dda58 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java @@ -16,6 +16,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.DoubleType; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.LongType; @@ -38,6 +39,7 @@ public class TestPartitionFields public void testParse() { assertParse("order_key", partitionSpec(builder -> builder.identity("order_key"))); + assertParse("nested.value", partitionSpec(builder -> builder.identity("nested.value"))); assertParse("comment", partitionSpec(builder -> builder.identity("comment"))); assertParse("year(ts)", partitionSpec(builder -> builder.year("ts"))); assertParse("month(ts)", partitionSpec(builder -> builder.month("ts"))); @@ -49,6 +51,7 @@ public void testParse() assertParse("void(order_key)", partitionSpec(builder -> builder.alwaysNull("order_key"))); assertInvalid("bucket()", "Invalid partition field declaration: bucket()"); + assertInvalid(".nested", "Invalid partition field declaration: .nested"); assertInvalid("abc", "Cannot find source column: abc"); assertInvalid("notes", "Cannot partition by non-primitive source field: list"); assertInvalid("bucket(price, 42)", "Cannot bucket by type: double"); @@ -86,7 +89,8 @@ private static PartitionSpec partitionSpec(Consumer consu NestedField.required(2, "ts", TimestampType.withoutZone()), NestedField.required(3, "price", DoubleType.get()), NestedField.optional(4, "comment", StringType.get()), - NestedField.optional(5, "notes", ListType.ofRequired(6, StringType.get()))); + NestedField.optional(5, "notes", ListType.ofRequired(6, StringType.get())), + NestedField.required(7, "nested", Types.StructType.of(NestedField.required(8, "value", StringType.get())))); PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); consumer.accept(builder); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index ca36bf3bbf7d..870caa859169 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -351,25 +351,30 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); - onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _varbinary VARBINARY, _bigint BIGINT) WITH (partitioning = ARRAY['_string', '_varbinary'], format = '%s')", trinoTableName, storageFormat)); - onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001), ('b', X'0ff102f0fefe', 1002), ('c', X'0ff102fdfeff', 1003)", trinoTableName)); + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _varbinary VARBINARY, _bigint BIGINT, nested ROW(data BIGINT)) WITH (partitioning = ARRAY['_string', '_varbinary', 'nested.data'], format = '%s')", trinoTableName, storageFormat)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', X'0ff102f0feff', 1001, ROW(1)), ('b', X'0ff102f0fefe', 1002, ROW(2)), ('c', X'0ff102fdfeff', 1003, ROW(3))", trinoTableName)); - Row row1 = row("b", new byte[]{15, -15, 2, -16, -2, -2}, 1002); - String selectByString = "SELECT * FROM %s WHERE _string = 'b'"; + Row row1 = row("b", new byte[]{15, -15, 2, -16, -2, -2}, 1002, 2); + String selectByString = "SELECT _string, _varbinary, _bigint, nested.data FROM %s WHERE _string = 'b'"; assertThat(onTrino().executeQuery(format(selectByString, trinoTableName))) .containsOnly(row1); assertThat(onSpark().executeQuery(format(selectByString, sparkTableName))) .containsOnly(row1); - Row row2 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001); - String selectByVarbinary = "SELECT * FROM %s WHERE _varbinary = X'0ff102f0feff'"; + Row row2 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001, 1); + String selectByVarbinary = "SELECT _string, _varbinary, _bigint, nested.data FROM %s WHERE _varbinary = X'0ff102f0feff'"; assertThat(onTrino().executeQuery(format(selectByVarbinary, trinoTableName))) .containsOnly(row2); // for now this fails on spark see https://github.com/apache/iceberg/issues/2934 assertQueryFailure(() -> onSpark().executeQuery(format(selectByVarbinary, sparkTableName))) .hasMessageContaining("Cannot convert bytes to SQL literal: java.nio.HeapByteBuffer[pos=0 lim=6 cap=6]"); - onTrino().executeQuery("DROP TABLE " + trinoTableName); + Row row3 = row("a", new byte[]{15, -15, 2, -16, -2, -1}, 1001, 1); + String selectNested = "SELECT _string, _bigint, nested.data FROM %s WHERE nested.data = 1"; + assertThat(onTrino().executeQuery(format(selectNested, trinoTableName))) + .containsOnly(row3); + assertThat(onSpark().executeQuery(format(selectNested, sparkTableName))) + .containsOnly(row3); } @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")