diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index e05bfff82994..9a57024d1608 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -747,6 +747,18 @@ WITH ( orc_bloom_filter_fpp = 0.05) ``` +The table definition below specifies to use Avro files, partitioning +by `child1` field in `parent` column: + +``` +CREATE TABLE test_table ( + data INTEGER, + parent ROW(child1 DOUBLE, child2 INTEGER)) +WITH ( + format = 'AVRO', + partitioning = ARRAY['"parent.child1"']) +``` + (iceberg-metadata-tables)= #### Metadata tables diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java index 433466c966a0..071ddde5c42c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java @@ -13,28 +13,32 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableList; import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform; import io.trino.plugin.iceberg.PartitionTransforms.ValueTransform; import io.trino.spi.Page; import io.trino.spi.block.Block; import io.trino.spi.connector.BucketFunction; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; import org.apache.iceberg.PartitionSpec; import java.lang.invoke.MethodHandle; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform; +import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock; import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL; import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL; import static io.trino.spi.function.InvocationConvention.simpleConvention; import static io.trino.spi.type.TypeUtils.NULL_HASH_CODE; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class IcebergBucketFunction @@ -59,18 +63,14 @@ public IcebergBucketFunction( this.bucketCount = bucketCount; - Map fieldIdToInputChannel = new HashMap<>(); - for (int i = 0; i < partitioningColumns.size(); i++) { - Integer previous = fieldIdToInputChannel.put(partitioningColumns.get(i).getId(), i); - checkState(previous == null, "Duplicate id %s in %s at %s and %s", partitioningColumns.get(i).getId(), partitioningColumns, i, previous); - } + Map nameToFieldInfo = buildNameToFieldInfo(partitioningColumns); partitionColumns = partitionSpec.fields().stream() .map(field -> { - Integer channel = fieldIdToInputChannel.get(field.sourceId()); - checkArgument(channel != null, "partition field not found: %s", field); - Type inputType = partitioningColumns.get(channel).getType(); - ColumnTransform transform = getColumnTransform(field, inputType); - return new PartitionColumn(channel, transform.getValueTransform(), transform.getType()); + String fieldName = partitionSpec.schema().findColumnName(field.sourceId()); + FieldInfo fieldInfo = nameToFieldInfo.get(fieldName); + checkArgument(fieldInfo != null, "partition field not found: %s", field); + ColumnTransform transform = getColumnTransform(field, fieldInfo.type()); + return new PartitionColumn(fieldInfo.sourceChannel(), transform.getValueTransform(), transform.getType(), fieldInfo.path()); }) .collect(toImmutableList()); hashCodeInvokers = partitionColumns.stream() @@ -79,6 +79,35 @@ public IcebergBucketFunction( .collect(toImmutableList()); } + private static void addFieldInfo( + Map nameToFieldInfo, + int sourceChannel, + String fieldName, + List children, + Type type, + List path) + { + if (type instanceof RowType rowType) { + List fields = rowType.getFields(); + checkArgument(children.size() == fields.size(), format("children size (%s) == fields size (%s) is not equal", children.size(), fields.size())); + for (int i = 0; i < fields.size(); i++) { + ColumnIdentity child = children.get(i); + String qualifiedFieldName = fieldName + "." + child.getName(); + path.add(i); + if (fields.get(i).getType() instanceof RowType) { + addFieldInfo(nameToFieldInfo, sourceChannel, qualifiedFieldName, child.getChildren(), fields.get(i).getType(), path); + } + else { + nameToFieldInfo.put(qualifiedFieldName, new FieldInfo(sourceChannel, qualifiedFieldName, fields.get(i).getType(), ImmutableList.copyOf(path))); + } + path.removeLast(); + } + } + else { + nameToFieldInfo.put(fieldName, new FieldInfo(sourceChannel, fieldName, type, ImmutableList.copyOf(path))); + } + } + @Override public int getBucket(Page page, int position) { @@ -87,6 +116,9 @@ public int getBucket(Page page, int position) for (int i = 0; i < partitionColumns.size(); i++) { PartitionColumn partitionColumn = partitionColumns.get(i); Block block = page.getBlock(partitionColumn.getSourceChannel()); + for (int index : partitionColumn.getPath()) { + block = getRowFieldsFromBlock(block).get(index); + } Object value = partitionColumn.getValueTransform().apply(block, position); long valueHash = hashValue(hashCodeInvokers.get(i), value); hash = (31 * hash) + valueHash; @@ -95,6 +127,22 @@ public int getBucket(Page page, int position) return (int) ((hash & Long.MAX_VALUE) % bucketCount); } + private static Map buildNameToFieldInfo(List partitioningColumns) + { + Map nameToFieldInfo = new HashMap<>(); + for (int channel = 0; channel < partitioningColumns.size(); channel++) { + IcebergColumnHandle partitionColumn = partitioningColumns.get(channel); + addFieldInfo( + nameToFieldInfo, + channel, + partitionColumn.getName(), + partitionColumn.getColumnIdentity().getChildren(), + partitionColumn.getType(), + new LinkedList<>()); + } + return nameToFieldInfo; + } + private static long hashValue(MethodHandle method, Object value) { if (value == null) { @@ -119,12 +167,14 @@ private static class PartitionColumn private final int sourceChannel; private final ValueTransform valueTransform; private final Type resultType; + private final List path; - public PartitionColumn(int sourceChannel, ValueTransform valueTransform, Type resultType) + public PartitionColumn(int sourceChannel, ValueTransform valueTransform, Type resultType, List path) { this.sourceChannel = sourceChannel; this.valueTransform = requireNonNull(valueTransform, "valueTransform is null"); this.resultType = requireNonNull(resultType, "resultType is null"); + this.path = ImmutableList.copyOf(requireNonNull(path, "path is null")); } public int getSourceChannel() @@ -141,5 +191,20 @@ public ValueTransform getValueTransform() { return valueTransform; } + + public List getPath() + { + return path; + } + } + + private record FieldInfo(int sourceChannel, String name, Type type, List path) + { + FieldInfo + { + requireNonNull(name, "name is null"); + requireNonNull(type, "type is null"); + path = ImmutableList.copyOf(requireNonNull(path, "path is null")); + } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 309a873ff867..ccf7874a4af6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -254,12 +254,13 @@ import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshotAfter; import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; import static io.trino.plugin.iceberg.IcebergUtil.getColumnMetadatas; -import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns; import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime; import static io.trino.plugin.iceberg.IcebergUtil.getTableComment; +import static io.trino.plugin.iceberg.IcebergUtil.getTopLevelColumns; import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata; import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; @@ -614,7 +615,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con DiscretePredicates discretePredicates = null; if (!partitionSourceIds.isEmpty()) { // Extract identity partition columns - Map columns = getColumns(icebergTable.schema(), typeManager).stream() + Map columns = getProjectedColumns(icebergTable.schema(), typeManager).stream() .filter(column -> partitionSourceIds.contains(column.getId())) .collect(toImmutableMap(IcebergColumnHandle::getId, identity())); @@ -715,7 +716,7 @@ public Map getColumnHandles(ConnectorSession session, Conn { IcebergTableHandle table = checkValidTableHandle(tableHandle); ImmutableMap.Builder columnHandles = ImmutableMap.builder(); - for (IcebergColumnHandle columnHandle : getColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) { + for (IcebergColumnHandle columnHandle : getTopLevelColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) { columnHandles.put(columnHandle.getName(), columnHandle); } columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle()); @@ -1041,13 +1042,32 @@ private Optional getWriteLayout(Schema tableSchema, Partit return Optional.empty(); } - validateNotPartitionedByNestedField(tableSchema, partitionSpec); - Map columnById = getColumns(tableSchema, typeManager).stream() + Map indexParents = indexParents(tableSchema.asStruct()); + Map columnById = getProjectedColumns(tableSchema, typeManager).stream() .collect(toImmutableMap(IcebergColumnHandle::getId, identity())); List partitioningColumns = partitionSpec.fields().stream() .sorted(Comparator.comparing(PartitionField::sourceId)) - .map(field -> requireNonNull(columnById.get(field.sourceId()), () -> "Cannot find source column for partitioning field " + field)) + .map(field -> { + boolean isBaseColumn = !indexParents.containsKey(field.sourceId()); + int sourceId; + if (isBaseColumn) { + sourceId = field.sourceId(); + } + else { + sourceId = getRootFieldId(indexParents, field.sourceId()); + } + Type sourceType = tableSchema.findType(sourceId); + // The source column, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct. + // https://iceberg.apache.org/spec/#partitioning + if (sourceType.isMapType()) { + throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a map"); + } + if (sourceType.isListType()) { + throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a array"); + } + return requireNonNull(columnById.get(sourceId), () -> "Cannot find source column for partition field " + field); + }) .distinct() .collect(toImmutableList()); List partitioningColumnNames = partitioningColumns.stream() @@ -1062,6 +1082,15 @@ private Optional getWriteLayout(Schema tableSchema, Partit return Optional.of(new ConnectorTableLayout(partitioningHandle, partitioningColumnNames, true)); } + private static int getRootFieldId(Map indexParents, int fieldId) + { + int rootFieldId = fieldId; + while (indexParents.containsKey(rootFieldId)) { + rootFieldId = indexParents.get(rootFieldId); + } + return rootFieldId; + } + @Override public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode) { @@ -1069,7 +1098,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); validateNotModifyingOldSnapshot(table, icebergTable); - validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec()); beginTransaction(icebergTable); @@ -1084,7 +1112,7 @@ private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, transformValues(table.specs(), PartitionSpecParser::toJson), table.spec().specId(), getSupportedSortFields(table.schema(), table.sortOrder()), - getColumns(table.schema(), typeManager), + getProjectedColumns(table.schema(), typeManager), table.location(), getFileFormat(table), table.properties(), @@ -1330,7 +1358,7 @@ private Optional getTableHandleForOptimize( tableHandle.getSnapshotId(), tableHandle.getTableSchemaJson(), tableHandle.getPartitionSpecJson().orElseThrow(() -> new VerifyException("Partition spec missing in the table handle")), - getColumns(SchemaParser.fromJson(tableHandle.getTableSchemaJson()), typeManager), + getProjectedColumns(SchemaParser.fromJson(tableHandle.getTableSchemaJson()), typeManager), icebergTable.sortOrder().fields().stream() .map(TrinoSortField::fromIceberg) .collect(toImmutableList()), @@ -1431,7 +1459,6 @@ private BeginTableExecuteResult OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) { @@ -1891,7 +1918,6 @@ private static void updatePartitioning(Table icebergTable, Transaction transacti } else { PartitionSpec partitionSpec = parsePartitionFields(schema, partitionColumns); - validateNotPartitionedByNestedField(schema, partitionSpec); Set partitionFields = ImmutableSet.copyOf(partitionSpec.fields()); difference(existingPartitionFields, partitionFields).stream() .map(PartitionField::name) @@ -2372,7 +2398,6 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); validateNotModifyingOldSnapshot(table, icebergTable); - validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec()); beginTransaction(icebergTable); @@ -2403,16 +2428,6 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta } } - public static void validateNotPartitionedByNestedField(Schema schema, PartitionSpec partitionSpec) - { - Map indexParents = indexParents(schema.asStruct()); - for (PartitionField field : partitionSpec.fields()) { - if (indexParents.containsKey(field.sourceId())) { - throw new TrinoException(NOT_SUPPORTED, "Partitioning by nested field is unsupported: " + field.name()); - } - } - } - private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection fragments, RetryMode retryMode) { Table icebergTable = transaction.table(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 4feb274b3c8b..f5e4fa414e89 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -43,6 +43,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import java.io.Closeable; @@ -56,17 +57,18 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; import static io.trino.plugin.iceberg.IcebergSessionProperties.isSortedWritingEnabled; -import static io.trino.plugin.iceberg.IcebergUtil.getColumns; +import static io.trino.plugin.iceberg.IcebergUtil.getTopLevelColumns; import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform; import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -161,7 +163,7 @@ public IcebergPageSink( this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); this.metricsConfig = MetricsConfig.fromProperties(requireNonNull(storageProperties, "storageProperties is null")); this.maxOpenWriters = maxOpenWriters; - this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); + this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec, outputSchema)); this.targetMaxFileSize = IcebergSessionProperties.getTargetMaxFileSize(session); this.idleWriterMinFileSize = IcebergSessionProperties.getIdleWriterMinFileSize(session); this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); @@ -172,7 +174,7 @@ public IcebergPageSink( this.tempDirectory = Location.of(locationProvider.newDataLocation("trino-tmp-files")); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); - this.columnTypes = getColumns(outputSchema, typeManager).stream() + this.columnTypes = getTopLevelColumns(outputSchema, typeManager).stream() .map(IcebergColumnHandle::getType) .collect(toImmutableList()); @@ -457,7 +459,7 @@ private Optional getPartitionData(List columns, Object[] values = new Object[columns.size()]; for (int i = 0; i < columns.size(); i++) { PartitionColumn column = columns.get(i); - Block block = page.getBlock(column.getSourceChannel()); + Block block = PagePartitioner.getPartitionBlock(column, page); Type type = column.getSourceType(); org.apache.iceberg.types.Type icebergType = outputSchema.findType(column.getField().sourceId()); Object value = getIcebergValue(block, position, type); @@ -525,7 +527,7 @@ public static Object getIcebergValue(Block block, int position, Type type) throw new UnsupportedOperationException("Type not supported as partition column: " + type.getDisplayName()); } - private static List toPartitionColumns(List handles, PartitionSpec partitionSpec) + private static List toPartitionColumns(List handles, PartitionSpec partitionSpec, Schema schema) { Map idChannels = new HashMap<>(); for (int i = 0; i < handles.size(); i++) { @@ -533,16 +535,62 @@ private static List toPartitionColumns(List { - Integer channel = idChannels.get(field.sourceId()); - checkArgument(channel != null, "partition field not found: %s", field); - Type inputType = handles.get(channel).getType(); - ColumnTransform transform = getColumnTransform(field, inputType); - return new PartitionColumn(field, channel, inputType, transform.getType(), transform.getBlockTransform()); - }) + .map(field -> getPartitionColumn(field, handles, schema.asStruct(), idChannels)) .collect(toImmutableList()); } + private static PartitionColumn getPartitionColumn(PartitionField field, List handles, Types.StructType schema, Map idChannels) + { + List sourceChannels = getIndexPathToField(schema, getNestedFieldIds(schema, field.sourceId())); + Type sourceType = handles.get(idChannels.get(field.sourceId())).getType(); + ColumnTransform transform = getColumnTransform(field, sourceType); + return new PartitionColumn(field, sourceChannels, sourceType, transform.getType(), transform.getBlockTransform()); + } + + private static List getNestedFieldIds(Types.StructType schema, Integer sourceId) + { + Map parentIndex = TypeUtil.indexParents(schema); + Map idIndex = TypeUtil.indexById(schema); + ImmutableList.Builder parentColumnsBuilder = ImmutableList.builder(); + + parentColumnsBuilder.add(idIndex.get(sourceId).fieldId()); + Integer current = parentIndex.get(sourceId); + + while (current != null) { + parentColumnsBuilder.add(idIndex.get(current).fieldId()); + current = parentIndex.get(current); + } + return parentColumnsBuilder.build().reverse(); + } + + private static List getIndexPathToField(Types.StructType schema, List nestedFieldIds) + { + ImmutableList.Builder sourceIdsBuilder = ImmutableList.builder(); + Types.StructType current = schema; + + // Iterate over field names while finding position in schema + for (int i = 0; i < nestedFieldIds.size(); i++) { + int fieldId = nestedFieldIds.get(i); + sourceIdsBuilder.add(findFieldPosFromSchema(fieldId, current)); + + if (i + 1 < nestedFieldIds.size()) { + checkState(current.field(fieldId).type().isStructType(), "Could not find field " + nestedFieldIds + " in schema"); + current = current.field(fieldId).type().asStructType(); + } + } + return sourceIdsBuilder.build(); + } + + private static int findFieldPosFromSchema(int fieldId, Types.StructType struct) + { + for (int i = 0; i < struct.fields().size(); i++) { + if (struct.fields().get(i).fieldId() == fieldId) { + return i; + } + } + throw new IllegalArgumentException("Could not find field " + fieldId + " in schema"); + } + private static class WriteContext { private final IcebergFileWriter writer; @@ -600,7 +648,7 @@ public int[] partitionPage(Page page) Block[] blocks = new Block[columns.size()]; for (int i = 0; i < columns.size(); i++) { PartitionColumn column = columns.get(i); - Block block = page.getBlock(column.getSourceChannel()); + Block block = getPartitionBlock(column, page); blocks[i] = column.getBlockTransform().apply(block); } Page transformed = new Page(page.getPositionCount(), blocks); @@ -608,6 +656,16 @@ public int[] partitionPage(Page page) return pageIndexer.indexPage(transformed); } + private static Block getPartitionBlock(PartitionColumn column, Page page) + { + List sourceChannels = column.getSourceChannels(); + Block block = page.getBlock(sourceChannels.getFirst()); + for (int i = 1; i < sourceChannels.size(); i++) { + block = getRowFieldsFromBlock(block).get(sourceChannels.get(i)); + } + return block; + } + public int getMaxIndex() { return pageIndexer.getMaxIndex(); @@ -622,18 +680,19 @@ public List getColumns() private static class PartitionColumn { private final PartitionField field; - private final int sourceChannel; + private final List sourceChannels; private final Type sourceType; private final Type resultType; private final Function blockTransform; - public PartitionColumn(PartitionField field, int sourceChannel, Type sourceType, Type resultType, Function blockTransform) + public PartitionColumn(PartitionField field, List sourceChannels, Type sourceType, Type resultType, Function blockTransform) { this.field = requireNonNull(field, "field is null"); - this.sourceChannel = sourceChannel; + this.sourceChannels = ImmutableList.copyOf(requireNonNull(sourceChannels, "sourceChannels is null")); this.sourceType = requireNonNull(sourceType, "sourceType is null"); this.resultType = requireNonNull(resultType, "resultType is null"); this.blockTransform = requireNonNull(blockTransform, "blockTransform is null"); + checkState(!sourceChannels.isEmpty(), "sourceChannels is empty"); } public PartitionField getField() @@ -641,9 +700,9 @@ public PartitionField getField() return field; } - public int getSourceChannel() + public List getSourceChannels() { - return sourceChannel; + return sourceChannels; } public Type getSourceType() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index b8fe3e6370fd..c78bc2b827e2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -175,6 +175,6 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction schema, partitionsSpecs, pageSink, - tableHandle.inputColumns().size()); + schema.columns().size()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index e3c1e391f7dd..489f79519c7d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -70,6 +70,7 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; @@ -231,6 +232,44 @@ public static Table getIcebergTableWithMetadata( return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER); } + public static List getProjectedColumns(Schema schema, TypeManager typeManager) + { + ImmutableList.Builder projectedColumns = ImmutableList.builder(); + StructType schemaAsStruct = schema.asStruct(); + Map indexById = TypeUtil.indexById(schemaAsStruct); + Map indexParents = TypeUtil.indexParents(schemaAsStruct); + Map> indexPaths = indexById.entrySet().stream() + .collect(toImmutableMap(Entry::getKey, e -> ImmutableList.copyOf(buildPath(indexParents, e.getKey())))); + + for (Map.Entry entry : indexById.entrySet()) { + int fieldId = entry.getKey(); + NestedField childField = entry.getValue(); + NestedField baseField = childField; + + List path = requireNonNull(indexPaths.get(fieldId)); + if (!path.isEmpty()) { + baseField = indexById.get(path.getFirst()); + path = ImmutableList.builder() + .addAll(path.subList(1, path.size())) // Base column id shouldn't exist in IcebergColumnHandle.path + .add(fieldId) // Append the leaf field id + .build(); + } + projectedColumns.add(createColumnHandle(baseField, childField, typeManager, path)); + } + return projectedColumns.build(); + } + + private static List buildPath(Map indexParents, int fieldId) + { + List path = new ArrayList<>(); + while (indexParents.containsKey(fieldId)) { + int parentId = indexParents.get(fieldId); + path.add(parentId); + fieldId = parentId; + } + return ImmutableList.copyOf(path.reversed()); + } + public static Map getIcebergTableProperties(Table icebergTable) { ImmutableMap.Builder properties = ImmutableMap.builder(); @@ -289,7 +328,7 @@ public static Optional getOrcBloomFilterFpp(Map properti .findFirst(); } - public static List getColumns(Schema schema, TypeManager typeManager) + public static List getTopLevelColumns(Schema schema, TypeManager typeManager) { return schema.columns().stream() .map(column -> getColumnHandle(column, typeManager)) @@ -329,14 +368,18 @@ public static Schema updateColumnComment(Schema schema, String columnName, Strin public static IcebergColumnHandle getColumnHandle(NestedField column, TypeManager typeManager) { - Type type = toTrinoType(column.type(), typeManager); + return createColumnHandle(column, column, typeManager, ImmutableList.of()); + } + + private static IcebergColumnHandle createColumnHandle(NestedField baseColumn, NestedField childColumn, TypeManager typeManager, List path) + { return new IcebergColumnHandle( - createColumnIdentity(column), - type, - ImmutableList.of(), - type, - column.isOptional(), - Optional.ofNullable(column.doc())); + createColumnIdentity(baseColumn), + toTrinoType(baseColumn.type(), typeManager), + path, + toTrinoType(childColumn.type(), typeManager), + childColumn.isOptional(), + Optional.ofNullable(childColumn.doc())); } public static Schema schemaFromHandles(List columns) @@ -707,7 +750,7 @@ public static Schema schemaFromViewColumns(TypeManager typeManager, List viewColumnsFromSchema(TypeManager typeManager, Schema schema) { - return IcebergUtil.getColumns(schema, typeManager).stream() + return IcebergUtil.getTopLevelColumns(schema, typeManager).stream() .map(column -> new ViewColumn(column.getName(), column.getType().getTypeId(), column.getComment())) .toList(); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java index a2119d68f43f..0ea82a64cbf8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java @@ -37,6 +37,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.StructLikeWrapper; @@ -65,7 +66,6 @@ import static io.trino.spi.type.TypeUtils.writeNativeValue; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toSet; -import static java.util.stream.Collectors.toUnmodifiableSet; public class PartitionTable implements SystemTable @@ -142,10 +142,7 @@ public ConnectorTableMetadata getTableMetadata() private static List getAllPartitionFields(Table icebergTable) { - Set existingColumnsIds = icebergTable.schema() - .columns().stream() - .map(NestedField::fieldId) - .collect(toUnmodifiableSet()); + Set existingColumnsIds = TypeUtil.indexById(icebergTable.schema().asStruct()).keySet(); List visiblePartitionFields = icebergTable.specs() .values().stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java index 1ecc006d48cc..1f9849aa1c96 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java @@ -63,10 +63,10 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled; -import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getFileModifiedTimePathDomain; import static io.trino.plugin.iceberg.IcebergUtil.getModificationTime; import static io.trino.plugin.iceberg.IcebergUtil.getPathDomain; +import static io.trino.plugin.iceberg.IcebergUtil.getTopLevelColumns; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.VarbinaryType.VARBINARY; @@ -128,7 +128,7 @@ public static TableStatistics makeTableStatistics( Schema icebergTableSchema = icebergTable.schema(); List columns = icebergTableSchema.columns(); - List columnHandles = getColumns(icebergTableSchema, typeManager); + List columnHandles = getTopLevelColumns(icebergTableSchema, typeManager); Map idToColumnHandle = columnHandles.stream() .collect(toUnmodifiableMap(IcebergColumnHandle::getId, identity())); Map idToType = columns.stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunction.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunction.java index 2b5f8798bb4a..497820d65ef7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunction.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunction.java @@ -152,7 +152,7 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact columns.add(new Descriptor.Field(DATA_CHANGE_ORDINAL_NAME, Optional.of(INTEGER))); ImmutableList.Builder columnHandlesBuilder = ImmutableList.builder(); - IcebergUtil.getColumns(tableSchema, typeManager).forEach(columnHandlesBuilder::add); + IcebergUtil.getTopLevelColumns(tableSchema, typeManager).forEach(columnHandlesBuilder::add); columnHandlesBuilder.add(new IcebergColumnHandle( new ColumnIdentity(DATA_CHANGE_TYPE_ID, DATA_CHANGE_TYPE_NAME, PRIMITIVE, ImmutableList.of()), VARCHAR, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 78ab210fe2ce..a1afe8d39feb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -1051,17 +1051,194 @@ public void testCreatePartitionedTableWithNestedTypes() } @Test - public void testCreatePartitionedTableWithNestedField() + public void testCreateTableWithUnsupportedNestedFieldPartitioning() { assertQueryFails( - "CREATE TABLE test_partitioned_table_nested_field(parent ROW(child VARCHAR)) WITH (partitioning = ARRAY['\"parent.child\"'])", - "\\QPartitioning by nested field is unsupported: parent.child"); + "CREATE TABLE test_partitioned_table_nested_field_3 (grandparent ROW(parent ROW(child VARCHAR))) WITH (partitioning = ARRAY['\"grandparent.parent\"'])", + "\\QUnable to parse partitioning value: Cannot partition by non-primitive source field: struct<3: child: optional string>"); assertQueryFails( - "CREATE TABLE test_partitioned_table_nested_field(grandparent ROW(parent ROW(child VARCHAR))) WITH (partitioning = ARRAY['\"grandparent.parent.child\"'])", - "\\QPartitioning by nested field is unsupported: grandparent.parent.child"); + "CREATE TABLE test_partitioned_table_nested_field_inside_array (parent ARRAY(ROW(child VARCHAR))) WITH (partitioning = ARRAY['\"parent.child\"'])", + "\\QPartitioning field [parent.child] cannot be contained in a array"); assertQueryFails( - "CREATE TABLE test_partitioned_table_nested_field(grandparent ROW(parent ROW(child VARCHAR))) WITH (partitioning = ARRAY['\"grandparent.parent\"'])", - "\\QUnable to parse partitioning value: Cannot partition by non-primitive source field: struct<3: child: optional string>"); + "CREATE TABLE test_partitioned_table_nested_field_inside_map (parent MAP(ROW(child INTEGER), ARRAY(VARCHAR))) WITH (partitioning = ARRAY['\"parent.key.child\"'])", + "\\QPartitioning field [parent.key.child] cannot be contained in a map"); + assertQueryFails( + "CREATE TABLE test_partitioned_table_nested_field_year_transform_in_string (parent ROW(child VARCHAR)) WITH (partitioning = ARRAY['year(\"parent.child\")'])", + "\\QUnable to parse partitioning value: Invalid source type string for transform: year"); + } + + @Test + public void testNestedFieldPartitionedTable() + { + String tableName = "test_nested_field_partitioned_table_" + randomNameSuffix(); + assertQuerySucceeds("CREATE TABLE " + tableName + "(id INTEGER, name VARCHAR, parent ROW(child VARCHAR, child2 VARCHAR))" + + " WITH (partitioning = ARRAY['id', '\"parent.child\"', '\"parent.child2\"'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'presto', ROW('a', 'b'))", 1); + + assertThat(query("SELECT id, name, parent.child, parent.child2 FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES (1, 'presto', 'a', 'b')"); + + assertUpdate("UPDATE " + tableName + " SET name = 'trino' WHERE parent.child = 'a'", 1); + assertQuerySucceeds("DELETE FROM " + tableName); + assertThat(query("SELECT * FROM " + tableName)) + .returnsEmptyResult(); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'trino', ROW('a', 'b'))", 1); + + assertThat(query("SELECT id, name, parent.child, parent.child2 FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES (1, 'trino', 'a', 'b')"); + + String newTableName = "test_nested_field_partitioned_table_" + randomNameSuffix(); + assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME TO " + newTableName); + + assertQuerySucceeds(withSingleWriterPerTask(getSession()), "ALTER TABLE " + newTableName + " EXECUTE OPTIMIZE"); + assertQuerySucceeds(prepareCleanUpSession(), "ALTER TABLE " + newTableName + " EXECUTE expire_snapshots(retention_threshold => '0s')"); + + assertThat(query("SELECT id, name, parent.child, parent.child2 FROM " + newTableName)) + .skippingTypesCheck() + .matches("VALUES (1, 'trino', 'a', 'b')"); + + assertUpdate("DROP TABLE " + newTableName); + } + + @Test + public void testMultipleLevelNestedFieldPartitionedTable() + { + String tableName = "test_multiple_level_nested_field_partitioned_table_" + randomNameSuffix(); + assertQuerySucceeds("CREATE TABLE " + tableName + "(id INTEGER, gradparent ROW(parent ROW(child VARCHAR)))" + + " WITH (partitioning = ARRAY['\"gradparent.parent.child\"'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW(ROW('trino')))", 1); + + assertThat(query("SELECT id, gradparent.parent.child FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES (1, 'trino')"); + + assertUpdate("UPDATE " + tableName + " SET id = 2 WHERE gradparent.parent.child = 'trino'", 1); + assertQuerySucceeds("DELETE FROM " + tableName); + assertThat(query("SELECT * FROM " + tableName)) + .returnsEmptyResult(); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, ROW(ROW('trino')))", 1); + + assertThat(query("SELECT id, gradparent.parent.child FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES (3, 'trino')"); + + String newTableName = "test_multiple_level_nested_field_partitioned_table_" + randomNameSuffix(); + assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME TO " + newTableName); + + assertQuerySucceeds(withSingleWriterPerTask(getSession()), "ALTER TABLE " + newTableName + " EXECUTE OPTIMIZE"); + assertQuerySucceeds(prepareCleanUpSession(), "ALTER TABLE " + newTableName + " EXECUTE expire_snapshots(retention_threshold => '0s')"); + + assertThat(query("SELECT id, gradparent.parent.child FROM " + newTableName)) + .skippingTypesCheck() + .matches("VALUES (3, 'trino')"); + + assertUpdate("DROP TABLE " + newTableName); + } + + @Test + public void testNestedFieldPartitionedTableHavingSameChildName() + { + String tableName = "test_nested_field_partitioned_table_having_same_child_name_" + randomNameSuffix(); + assertQuerySucceeds("CREATE TABLE " + tableName + "(id INTEGER, gradparent ROW(parent ROW(child VARCHAR)), parent ROW(child VARCHAR))" + + " WITH (partitioning = ARRAY['\"gradparent.parent.child\"'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW(ROW('trino')), ROW('trinodb'))", 1); + + assertThat(query("SELECT id, gradparent.parent.child, parent.child FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES (1, 'trino', 'trinodb')"); + + assertUpdate("UPDATE " + tableName + " SET id = 2 WHERE gradparent.parent.child = 'trino'", 1); + assertQuerySucceeds("DELETE FROM " + tableName); + assertThat(query("SELECT * FROM " + tableName)) + .returnsEmptyResult(); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, ROW(ROW('trino')), ROW('trinodb'))", 1); + + assertThat(query("SELECT id, gradparent.parent.child, parent.child FROM " + tableName)) + .skippingTypesCheck() + .matches("VALUES (3, 'trino', 'trinodb')"); + + String newTableName = "test_nested_field_partitioned_table_having_same_child_name_" + randomNameSuffix(); + assertQuerySucceeds("ALTER TABLE " + tableName + " RENAME TO " + newTableName); + + assertQuerySucceeds(withSingleWriterPerTask(getSession()), "ALTER TABLE " + newTableName + " EXECUTE OPTIMIZE"); + assertQuerySucceeds(prepareCleanUpSession(), "ALTER TABLE " + newTableName + " EXECUTE expire_snapshots(retention_threshold => '0s')"); + + assertThat(query("SELECT id, gradparent.parent.child, parent.child FROM " + newTableName)) + .skippingTypesCheck() + .matches("VALUES (3, 'trino', 'trinodb')"); + + assertUpdate("DROP TABLE " + newTableName); + } + + @Test + public void testMergeWithNestedFieldPartitionedTable() + { + String sourceTable = "test_merge_with_nested_field_partitioned_table_source_" + randomNameSuffix(); + String targetTable = "test_merge_with_nested_field_partitioned_table_target_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + sourceTable + " (customer VARCHAR, purchases INT, address ROW (city VARCHAR))" + + " WITH (partitioning = ARRAY['\"address.city\"'])"); + assertUpdate( + "INSERT INTO " + sourceTable + " (customer, purchases, address)" + + " VALUES ('Aaron', 6, ROW('Arches')), ('Ed', 7, ROW('Etherville')), ('Carol', 9, ROW('Centreville')), ('Dave', 11, ROW('Darbyshire'))", + 4); + + assertUpdate("CREATE TABLE " + targetTable + " (customer VARCHAR, purchases INT, address ROW (city VARCHAR))" + + " WITH (partitioning = ARRAY['\"address.city\"'])"); + assertUpdate( + "INSERT INTO " + targetTable + " (customer, purchases, address) " + + " VALUES ('Aaron', 5, ROW('Antioch')), ('Bill', 7, ROW('Buena')), ('Carol', 3, ROW('Cambridge')), ('Dave', 11, ROW('Devon'))", + 4); + + String sql = "MERGE INTO " + targetTable + " t USING " + sourceTable + " s ON (t.customer = s.customer)" + + " WHEN MATCHED AND s.address.city = 'Centreville' THEN DELETE" + + " WHEN MATCHED THEN UPDATE SET purchases = s.purchases + t.purchases" + + " WHEN NOT MATCHED THEN INSERT (customer, purchases, address) VALUES (s.customer, s.purchases, s.address)"; + + assertUpdate(sql, 4); + + assertQuery( + "SELECT customer, purchases, address.city FROM " + targetTable, + "VALUES ('Aaron', 11, 'Antioch'), ('Ed', 7, 'Etherville'), ('Bill', 7, 'Buena'), ('Dave', 22, 'Devon')"); + + assertUpdate("DROP TABLE " + sourceTable); + assertUpdate("DROP TABLE " + targetTable); + } + + @Test + public void testSchemaEvolutionWithNestedFieldPartitioning() + { + String tableName = "test_schema_evolution_with_nested_field_partitioning_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (c1 bigint, parent1 ROW(child VARCHAR), parent2 ROW(child VARCHAR)) WITH (partitioning = ARRAY['\"parent1.child\"'])"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW('BLR'), ROW('BLR'))", 1); + assertQuery("SELECT c1, parent1.child, parent2.child from " + tableName, "VALUES (1, 'BLR', 'BLR')"); + + // Drop end column + assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN parent2"); + assertQuery("SELECT c1, parent1.child FROM " + tableName, "VALUES (1, 'BLR')"); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN parent3 ROW(child VARCHAR)"); + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN parent4 ROW(child VARCHAR)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, ROW('DEL'), ROW('DL'), ROW('IN'))", 1); + assertQuery("SELECT c1, parent1.child, parent3.child, parent4.child FROM " + tableName, "VALUES (1, 'BLR', NULL, NULL), (2, 'DEL', 'DL', 'IN')"); + + // Drop a column (parent3) from middle of table + assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN parent3"); + assertQuery("SELECT c1, parent1.child, parent4.child FROM " + tableName, "VALUES (1, 'BLR', NULL), (2, 'DEL', 'IN')"); + + // Rename nested column + assertUpdate("ALTER TABLE " + tableName + " RENAME COLUMN parent4 TO renamed_parent"); + + // Rename nested partitioned column + assertUpdate("ALTER TABLE " + tableName + " RENAME COLUMN parent1 TO renamed_partitioned_parent"); + + assertQuery("SHOW COLUMNS FROM " + tableName, "VALUES " + + "('c1', 'bigint', '', ''), " + + "('renamed_partitioned_parent', 'row(child varchar)', '', ''), " + + "('renamed_parent', 'row(child varchar)', '', '')"); + assertUpdate("DROP TABLE " + tableName); } @Test @@ -1576,10 +1753,13 @@ public void testDuplicatedFieldNames() public void testDropPartitionColumn() { String tableName = "test_drop_partition_column_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, name VARCHAR, age INTEGER) WITH (partitioning = ARRAY['id', 'truncate(name, 5)', 'void(age)'])"); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, name VARCHAR, age INTEGER, nested ROW(f1 integer, f2 integer)) " + + "WITH (partitioning = ARRAY['id', 'truncate(name, 5)', 'void(age)', '\"nested.f1\"'])"); assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN id", "Cannot drop partition field: id"); assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN name", "Cannot drop partition field: name"); assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN age", "Cannot drop partition field: age"); + assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN nested", "Failed to drop column.*"); + assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN nested.f1", "Cannot drop partition field: nested.f1"); assertUpdate("DROP TABLE " + tableName); } @@ -2028,6 +2208,45 @@ public void testPartitionPredicatePushdownWithHistoricalPartitionSpecs() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testPartitionPredicatePushdownWithNestedFieldPartitioning() + { + // Start with a bucket transform, which cannot be used for predicate pushdown + String tableName = "test_partition_predicate_pushdown_with_nested_field_partitioning"; + assertUpdate("CREATE TABLE " + tableName + " (parent ROW(child1 TIMESTAMP(6), child2 INTEGER)) WITH (partitioning = ARRAY['bucket(\"parent.child2\", 3)'])"); + String selectQuery = "SELECT parent.child2 FROM " + tableName + " WHERE CAST(parent.child1 AS date) < DATE '2015-01-02'"; + + String initialValues = + "ROW(ROW(TIMESTAMP '1969-12-31 22:22:22.222222', 8))," + + "ROW(ROW(TIMESTAMP '1969-12-31 23:33:11.456789', 9))," + + "ROW(ROW(TIMESTAMP '1969-12-31 23:44:55.567890', 10))"; + assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3); + assertThat(query(selectQuery)) + .containsAll("VALUES 8, 9, 10") + .isNotFullyPushedDown(FilterNode.class); + + String hourTransformValues = + "ROW(ROW(TIMESTAMP '2015-01-01 10:01:23.123456', 1))," + + "ROW(ROW(TIMESTAMP '2015-01-02 10:10:02.987654', 2))," + + "ROW(ROW(TIMESTAMP '2015-01-03 10:55:00.456789', 3))"; + // While the bucket transform is still used, the hour transform cannot be used for pushdown + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['hour(\"parent.child1\")']"); + assertUpdate("INSERT INTO " + tableName + " VALUES " + hourTransformValues, 3); + assertThat(query(selectQuery)) + .containsAll("VALUES 1, 8, 9, 10") + .isNotFullyPushedDown(FilterNode.class); + + // The old partition scheme is no longer used so pushdown using the hour transform is allowed + assertUpdate("DELETE FROM " + tableName + " WHERE year(parent.child1) = 1969", 3); + assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize"); + assertUpdate("INSERT INTO " + tableName + " VALUES " + initialValues, 3); + assertThat(query(selectQuery)) + .containsAll("VALUES 1, 8, 9, 10") + .isFullyPushedDown(); + + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testDayTransformDate() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java index 8d8c1313d6cd..e6a84857053f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPartitionEvolution.java @@ -155,13 +155,127 @@ public void testChangePartitionTransform() } @Test - public void testUnsupportedNestedFieldPartition() + public void testAddNestedPartitioning() { - String tableName = "test_unsupported_nested_field_partition_" + randomNameSuffix(); - assertUpdate("CREATE TABLE " + tableName + "(parent ROW(child VARCHAR))"); - assertQueryFails( - "ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['\"parent.child\"']", - "Partitioning by nested field is unsupported: parent.child"); + String tableName = "test_add_nested_partition_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, district ROW(name VARCHAR), state ROW(name VARCHAR)) WITH (partitioning = ARRAY['\"state.name\"'])"); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('Patna'), ROW('BH')), " + + "(2, ROW('Gaya'), ROW('BH')), " + + "(3, ROW('Bengaluru'), ROW('KA')), " + + "(4, ROW('Mengaluru'), ROW('KA'))", + 4); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['\"state.name\"', '\"district.name\"']"); + + assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['\"state.name\"','\"district.name\"']"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('Patna'), ROW('BH')), " + + "(2, ROW('Patna'), ROW('BH')), " + + "(3, ROW('Bengaluru'), ROW('KA')), " + + "(4, ROW('Mengaluru'), ROW('KA'))", + 4); + + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List initialPartitionedFiles = files.stream() + .filter(file -> !((String) file.getField(0)).contains("district.name=")) + .collect(toImmutableList()); + + List laterPartitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("district.name=")) + .collect(toImmutableList()); + + assertThat(initialPartitionedFiles).hasSize(2); + assertThat(initialPartitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum()).isEqualTo(4L); + + assertThat(laterPartitionedFiles).hasSize(3); + assertThat(laterPartitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum()).isEqualTo(4L); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testRemoveNestedPartitioning() + { + String tableName = "test_remove_nested_partition_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, district ROW(name VARCHAR), state ROW(name VARCHAR)) WITH (partitioning = ARRAY['\"state.name\"'])"); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('Patna'), ROW('BH')), " + + "(2, ROW('Gaya'), ROW('BH')), " + + "(3, ROW('Bengaluru'), ROW('KA')), " + + "(4, ROW('Mengaluru'), ROW('KA'))", + 4); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY[]"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('Patna'), ROW('BH')), " + + "(2, ROW('Gaya'), ROW('BH')), " + + "(3, ROW('Bengaluru'), ROW('KA')), " + + "(4, ROW('Mengaluru'), ROW('KA'))", + 4); + + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List unpartitionedFiles = files.stream() + .filter(file -> !((String) file.getField(0)).contains("state.name=")) + .collect(toImmutableList()); + + List partitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("state.name=")) + .collect(toImmutableList()); + + assertThat(partitionedFiles).hasSize(2); + assertThat(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum()).isEqualTo(4L); + + assertThat(unpartitionedFiles).hasSize(1); + assertThat((long) unpartitionedFiles.get(0).getField(1)).isEqualTo(4); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testNestedFieldChangePartitionTransform() + { + String tableName = "test_nested_field_change_partition_transform_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (grandparent ROW(parent ROW(ts TIMESTAMP(6), a INT), b INT), c INT) " + + "WITH (partitioning = ARRAY['year(\"grandparent.parent.ts\")'])"); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(ROW(ROW(TIMESTAMP '2021-01-01 01:01:01.111111', 1), 1), 1), " + + "(ROW(ROW(TIMESTAMP '2022-02-02 02:02:02.222222', 2), 2), 2), " + + "(ROW(ROW(TIMESTAMP '2023-03-03 03:03:03.333333', 3), 3), 3)", + 3); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['month(\"grandparent.parent.ts\")']"); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(ROW(ROW(TIMESTAMP '2024-04-04 04:04:04.444444', 4), 4), 4), " + + "(ROW(ROW(TIMESTAMP '2025-05-05 05:05:05.555555', 5), 5), 5)", + 2); + + assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['month(\"grandparent.parent.ts\")']"); + + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List yearPartitionedFiles = files.stream() + .filter(file -> { + String filePath = ((String) file.getField(0)); + return filePath.contains("grandparent.parent.ts_year=") && !filePath.contains("grandparent.parent.ts_month="); + }) + .collect(toImmutableList()); + + List monthPartitionedFiles = files.stream() + .filter(file -> { + String filePath = ((String) file.getField(0)); + return !filePath.contains("grandparent.parent.ts_year=") && filePath.contains("grandparent.parent.ts_month="); + }) + .collect(toImmutableList()); + + assertThat(yearPartitionedFiles).hasSize(3); + assertThat(monthPartitionedFiles).hasSize(2); assertUpdate("DROP TABLE " + tableName); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java index 00cd2d84f599..39db02d94ba9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java @@ -13,11 +13,18 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableList; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.junit.jupiter.api.Test; +import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns; import static io.trino.plugin.iceberg.IcebergUtil.parseVersion; +import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.groups.Tuple.tuple; public class TestIcebergUtil { @@ -56,4 +63,27 @@ public void testParseVersion() assertThatThrownBy(() -> parseVersion("v-10.metadata.json")) .hasMessageMatching("Invalid metadata file name:.*"); } + + @Test + public void testGetProjectedColumns() + { + Schema schema = new Schema( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.required(2, "nested", Types.StructType.of( + NestedField.required(3, "value", Types.StringType.get()), + NestedField.required(4, "list", Types.ListType.ofRequired(5, Types.StringType.get())), + NestedField.required(6, "nested", Types.StructType.of( + NestedField.required(7, "value", Types.StringType.get())))))); + + assertThat(getProjectedColumns(schema, TESTING_TYPE_MANAGER)) + .extracting(IcebergColumnHandle::getId, IcebergColumnHandle::getName, column -> column.getBaseColumn().getId(), IcebergColumnHandle::getPath) + .containsExactly( + tuple(1, "id", 1, ImmutableList.of()), + tuple(2, "nested", 2, ImmutableList.of()), + tuple(3, "value", 2, ImmutableList.of(3)), + tuple(4, "list", 2, ImmutableList.of(4)), + tuple(5, "element", 2, ImmutableList.of(4, 5)), + tuple(6, "nested", 2, ImmutableList.of(6)), + tuple(7, "value", 2, ImmutableList.of(6, 7))); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 0c439531e8f3..376326a3c480 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.trino.Session; import io.trino.filesystem.FileEntry; import io.trino.filesystem.FileIterator; @@ -34,6 +35,7 @@ import io.trino.spi.type.TestingTypeManager; import io.trino.spi.type.TypeManager; import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; import io.trino.testing.sql.TestTable; import org.apache.iceberg.BaseTable; @@ -67,15 +69,19 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; @@ -1022,6 +1028,247 @@ public void testReadingSnapshotReference() ".*?Cannot find snapshot with reference name: TEST-TAG"); } + @Test + public void testNestedFieldPartitioning() + { + String tableName = "test_nested_field_partitioning_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, district ROW(name VARCHAR), state ROW(name VARCHAR)) WITH (partitioning = ARRAY['\"state.name\"'])"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('Patna'), ROW('BH')), " + + "(2, ROW('Patna'), ROW('BH')), " + + "(3, ROW('Bengaluru'), ROW('KA')), " + + "(4, ROW('Bengaluru'), ROW('KA'))", + 4); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(5, ROW('Patna'), ROW('BH')), " + + "(6, ROW('Patna'), ROW('BH')), " + + "(7, ROW('Bengaluru'), ROW('KA')), " + + "(8, ROW('Bengaluru'), ROW('KA'))", + 4); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(4); + + assertUpdate("DELETE FROM " + tableName + " WHERE district.name = 'Bengaluru'", 4); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(4); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['\"state.name\"', '\"district.name\"']"); + Table icebergTable = updateTableToV2(tableName); + assertThat(icebergTable.spec().fields().stream().map(PartitionField::name).toList()) + .containsExactlyInAnyOrder("state.name", "district.name"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(9, ROW('Patna'), ROW('BH')), " + + "(10, ROW('Bengaluru'), ROW('BH')), " + + "(11, ROW('Bengaluru'), ROW('KA')), " + + "(12, ROW('Bengaluru'), ROW('KA'))", + 4); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(7); + + assertQuery("SELECT id, district.name, state.name FROM " + tableName, "VALUES " + + "(1, 'Patna', 'BH'), " + + "(2, 'Patna', 'BH'), " + + "(5, 'Patna', 'BH'), " + + "(6, 'Patna', 'BH'), " + + "(9, 'Patna', 'BH'), " + + "(10, 'Bengaluru', 'BH'), " + + "(11, 'Bengaluru', 'KA'), " + + "(12, 'Bengaluru', 'KA')"); + + assertUpdate("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(3); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testHighlyNestedFieldPartitioning() + { + String tableName = "test_highly_nested_field_partitioning_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, country ROW(name VARCHAR, state ROW(name VARCHAR, district ROW(name VARCHAR))))" + + " WITH (partitioning = ARRAY['\"country.state.district.name\"'])"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('India', ROW('BH', ROW('Patna')))), " + + "(2, ROW('India', ROW('BH', ROW('Patna')))), " + + "(3, ROW('India', ROW('KA', ROW('Bengaluru')))), " + + "(4, ROW('India', ROW('KA', ROW('Bengaluru'))))", + 4); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(5, ROW('India', ROW('BH', ROW('Patna')))), " + + "(6, ROW('India', ROW('BH', ROW('Patna')))), " + + "(7, ROW('India', ROW('KA', ROW('Bengaluru')))), " + + "(8, ROW('India', ROW('KA', ROW('Bengaluru'))))", + 4); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(4); + + assertQuery("SELECT partition.\"country.state.district.name\" FROM \"" + tableName + "$partitions\"", "VALUES 'Patna', 'Bengaluru'"); + + assertUpdate("DELETE FROM " + tableName + " WHERE country.state.district.name = 'Bengaluru'", 4); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(2); + + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['\"country.state.district.name\"', '\"country.state.name\"']"); + Table icebergTable = updateTableToV2(tableName); + assertThat(icebergTable.spec().fields().stream().map(PartitionField::name).toList()) + .containsExactlyInAnyOrder("country.state.district.name", "country.state.name"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(9, ROW('India', ROW('BH', ROW('Patna')))), " + + "(10, ROW('India', ROW('BH', ROW('Bengaluru')))), " + + "(11, ROW('India', ROW('KA', ROW('Bengaluru')))), " + + "(12, ROW('India', ROW('KA', ROW('Bengaluru'))))", + 4); + + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(5); + + assertQuery("SELECT id, country.name, country.state.name, country.state.district.name FROM " + tableName, "VALUES " + + "(1, 'India', 'BH', 'Patna'), " + + "(2, 'India', 'BH', 'Patna'), " + + "(5, 'India', 'BH', 'Patna'), " + + "(6, 'India', 'BH', 'Patna'), " + + "(9, 'India', 'BH', 'Patna'), " + + "(10, 'India', 'BH', 'Bengaluru'), " + + "(11, 'India', 'KA', 'Bengaluru'), " + + "(12, 'India', 'KA', 'Bengaluru')"); + + assertUpdate("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE"); + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(3); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testHighlyNestedFieldPartitioningWithTruncateTransform() + { + String tableName = "test_highly_nested_field_partitioning_with_transform_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, country ROW(name VARCHAR, state ROW(name VARCHAR, district ROW(name VARCHAR))))" + + " WITH (partitioning = ARRAY['truncate(\"country.state.district.name\", 5)'])"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('India', ROW('BH', ROW('Patna')))), " + + "(2, ROW('India', ROW('BH', ROW('Patna_Truncate')))), " + + "(3, ROW('India', ROW('DL', ROW('Delhi')))), " + + "(4, ROW('India', ROW('DL', ROW('Delhi_Truncate'))))", + 4); + + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(2); + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List partitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("country.state.district.name_trunc=")) + .collect(toImmutableList()); + + assertThat(partitionedFiles).hasSize(2); + assertThat(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum()).isEqualTo(4L); + + assertQuery("SELECT id, country.state.district.name, country.state.name, country.name FROM " + tableName, "VALUES " + + "(1, 'Patna', 'BH', 'India'), " + + "(2, 'Patna_Truncate', 'BH', 'India'), " + + "(3, 'Delhi', 'DL', 'India'), " + + "(4, 'Delhi_Truncate', 'DL', 'India')"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testHighlyNestedFieldPartitioningWithBucketTransform() + { + String tableName = "test_highly_nested_field_partitioning_with_transform_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, country ROW(name VARCHAR, state ROW(name VARCHAR, district ROW(name VARCHAR))))" + + " WITH (partitioning = ARRAY['bucket(\"country.state.district.name\", 2)'])"); + + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW('India', ROW('BH', ROW('Patna')))), " + + "(2, ROW('India', ROW('MH', ROW('Mumbai')))), " + + "(3, ROW('India', ROW('DL', ROW('Delhi')))), " + + "(4, ROW('India', ROW('KA', ROW('Bengaluru'))))", + 4); + + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(2); + List files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); + List partitionedFiles = files.stream() + .filter(file -> ((String) file.getField(0)).contains("country.state.district.name_bucket=")) + .collect(toImmutableList()); + + assertThat(partitionedFiles).hasSize(2); + assertThat(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum()).isEqualTo(4L); + + assertQuery("SELECT id, country.state.district.name, country.state.name, country.name FROM " + tableName, "VALUES " + + "(1, 'Patna', 'BH', 'India'), " + + "(2, 'Mumbai', 'MH', 'India'), " + + "(3, 'Delhi', 'DL', 'India'), " + + "(4, 'Bengaluru', 'KA', 'India')"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testHighlyNestedFieldPartitioningWithTimestampTransform() + { + testHighlyNestedFieldPartitioningWithTimestampTransform( + "ARRAY['year(\"grandparent.parent.ts\")']", + ".*?(grandparent\\.parent\\.ts_year=.*/).*", + ImmutableSet.of("grandparent.parent.ts_year=2021/", "grandparent.parent.ts_year=2022/", "grandparent.parent.ts_year=2023/")); + testHighlyNestedFieldPartitioningWithTimestampTransform( + "ARRAY['month(\"grandparent.parent.ts\")']", + ".*?(grandparent\\.parent\\.ts_month=.*/).*", + ImmutableSet.of("grandparent.parent.ts_month=2021-01/", "grandparent.parent.ts_month=2022-02/", "grandparent.parent.ts_month=2023-03/")); + testHighlyNestedFieldPartitioningWithTimestampTransform( + "ARRAY['day(\"grandparent.parent.ts\")']", + ".*?(grandparent\\.parent\\.ts_day=.*/).*", + ImmutableSet.of("grandparent.parent.ts_day=2021-01-01/", "grandparent.parent.ts_day=2022-02-02/", "grandparent.parent.ts_day=2023-03-03/")); + testHighlyNestedFieldPartitioningWithTimestampTransform( + "ARRAY['hour(\"grandparent.parent.ts\")']", + ".*?(grandparent\\.parent\\.ts_hour=.*/).*", + ImmutableSet.of("grandparent.parent.ts_hour=2021-01-01-01/", "grandparent.parent.ts_hour=2022-02-02-02/", "grandparent.parent.ts_hour=2023-03-03-03/")); + } + + private void testHighlyNestedFieldPartitioningWithTimestampTransform(String partitioning, String partitionDirectoryRegex, Set expectedPartitionDirectories) + { + String tableName = "test_highly_nested_field_partitioning_with_timestamp_transform_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INTEGER, grandparent ROW(parent ROW(ts TIMESTAMP(6), a INT), b INT)) WITH (partitioning = " + partitioning + ")"); + assertUpdate( + "INSERT INTO " + tableName + " VALUES " + + "(1, ROW(ROW(TIMESTAMP '2021-01-01 01:01:01.111111', 1), 1)), " + + "(2, ROW(ROW(TIMESTAMP '2022-02-02 02:02:02.222222', 2), 2)), " + + "(3, ROW(ROW(TIMESTAMP '2023-03-03 03:03:03.333333', 3), 3)), " + + "(4, ROW(ROW(TIMESTAMP '2022-02-02 02:04:04.444444', 4), 4))", + 4); + + assertThat(loadTable(tableName).newScan().planFiles()).hasSize(3); + Set partitionedDirectories = computeActual("SELECT file_path FROM \"" + tableName + "$files\"") + .getMaterializedRows().stream() + .map(entry -> extractPartitionFolder((String) entry.getField(0), partitionDirectoryRegex)) + .flatMap(Optional::stream) + .collect(toImmutableSet()); + + assertThat(partitionedDirectories).isEqualTo(expectedPartitionDirectories); + + assertQuery("SELECT id, grandparent.parent.ts, grandparent.parent.a, grandparent.b FROM " + tableName, "VALUES " + + "(1, '2021-01-01 01:01:01.111111', 1, 1), " + + "(2, '2022-02-02 02:02:02.222222', 2, 2), " + + "(3, '2023-03-03 03:03:03.333333', 3, 3), " + + "(4, '2022-02-02 02:04:04.444444', 4, 4)"); + + assertUpdate("DROP TABLE " + tableName); + } + + private Optional extractPartitionFolder(String file, String regex) + { + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(file); + if (matcher.matches()) { + return Optional.of(matcher.group(1)); + } + return Optional.empty(); + } + private void writeEqualityDeleteToNationTable(Table icebergTable) throws Exception { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java index 77ee270160a4..5b4574b4a6e8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestPartitionFields.java @@ -16,6 +16,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.DoubleType; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.LongType; @@ -67,8 +68,17 @@ public void testParse() assertParse("void(\"quoted field\")", partitionSpec(builder -> builder.alwaysNull("quoted field"))); assertParse("truncate(\"\"\"another\"\" \"\"quoted\"\" \"\"field\"\"\", 13)", partitionSpec(builder -> builder.truncate("\"another\" \"quoted\" \"field\"", 13))); assertParse("void(\"\"\"another\"\" \"\"quoted\"\" \"\"field\"\"\")", partitionSpec(builder -> builder.alwaysNull("\"another\" \"quoted\" \"field\""))); + assertParse("\"nested.value\"", partitionSpec(builder -> builder.identity("nested.value"))); + assertParse("year(\"nested.ts\")", partitionSpec(builder -> builder.year("nested.ts"))); + assertParse("month(\"nested.ts\")", partitionSpec(builder -> builder.month("nested.ts"))); + assertParse("day(\"nested.ts\")", partitionSpec(builder -> builder.day("nested.ts"))); + assertParse("hour(\"nested.nested.ts\")", partitionSpec(builder -> builder.hour("nested.nested.ts"))); + assertParse("truncate(\"nested.nested.value\", 13)", partitionSpec(builder -> builder.truncate("nested.nested.value", 13))); + assertParse("bucket(\"nested.nested.value\", 42)", partitionSpec(builder -> builder.bucket("nested.nested.value", 42))); + assertParse("void(\"nested.nested.value\")", partitionSpec(builder -> builder.alwaysNull("nested.nested.value"))); assertInvalid("bucket()", "Invalid partition field declaration: bucket()"); + assertInvalid(".nested", "Invalid partition field declaration: .nested"); assertInvalid("abc", "Cannot find source column: abc"); assertInvalid("notes", "Cannot partition by non-primitive source field: list"); assertInvalid("bucket(price, 42)", "Invalid source type double for transform: bucket[42]"); @@ -83,6 +93,7 @@ public void testParse() assertInvalid("\"ABC\"", "Uppercase characters in identifier '\"ABC\"' are not supported."); assertInvalid("year(ABC)", "Cannot find source column: abc"); assertInvalid("bucket(\"ABC\", 12)", "Uppercase characters in identifier '\"ABC\"' are not supported."); + assertInvalid("\"nested.list\"", "Cannot partition by non-primitive source field: list"); } private static void assertParse(String value, PartitionSpec expected, String canonicalRepresentation) @@ -122,7 +133,14 @@ private static PartitionSpec partitionSpec(Consumer consu NestedField.optional(5, "notes", ListType.ofRequired(6, StringType.get())), NestedField.optional(7, "quoted field", StringType.get()), NestedField.optional(8, "quoted ts", TimestampType.withoutZone()), - NestedField.optional(9, "\"another\" \"quoted\" \"field\"", StringType.get())); + NestedField.optional(9, "\"another\" \"quoted\" \"field\"", StringType.get()), + NestedField.required(10, "nested", Types.StructType.of( + NestedField.required(12, "value", StringType.get()), + NestedField.required(13, "ts", TimestampType.withZone()), + NestedField.required(14, "list", ListType.ofRequired(15, StringType.get())), + NestedField.required(16, "nested", Types.StructType.of( + NestedField.required(17, "value", StringType.get()), + NestedField.required(18, "ts", TimestampType.withZone())))))); PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); consumer.accept(builder); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 0ad7f1f2f3d4..60a56d20f06e 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -427,6 +427,161 @@ public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int onSpark().executeQuery("DROP TABLE " + sparkTableName); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") + public void testSparkReadsTrinoNestedPartitionedTable(StorageFormat storageFormat) + { + String baseTableName = toLowerCase("test_spark_reads_trino_nested_partitioned_table_" + storageFormat + randomNameSuffix()); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onTrino().executeQuery(format( + "CREATE TABLE %s (_string VARCHAR, _bigint BIGINT, _struct ROW(_field INT, _another_field VARCHAR))" + + " WITH (partitioning = ARRAY['\"_struct._field\"'], format = '%s')", + trinoTableName, + storageFormat)); + onTrino().executeQuery(format( + "INSERT INTO %s VALUES" + + " ('update', 1001, ROW(1, 'x'))," + + " ('b', 1002, ROW(2, 'y'))," + + " ('c', 1003, ROW(3, 'z'))", + trinoTableName)); + + onTrino().executeQuery("UPDATE " + trinoTableName + " SET _string = 'a' WHERE _struct._field = 1"); + onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE _struct._another_field = 'y'"); + onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE OPTIMIZE"); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + trinoTableName + " DROP COLUMN _struct._field")) + .hasMessageContaining("Cannot drop partition field: _struct._field"); + + List expectedRows = ImmutableList.of( + row("a", 1001, 1, "x"), + row("c", 1003, 3, "z")); + String select = "SELECT _string, _bigint, _struct._field, _struct._another_field FROM %s" + + " WHERE _struct._field = 1 OR _struct._another_field = 'z'"; + + assertThat(onTrino().executeQuery(format(select, trinoTableName))) + .containsOnly(expectedRows); + assertThat(onSpark().executeQuery(format(select, sparkTableName))) + .containsOnly(expectedRows); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") + public void testTrinoReadsSparkNestedPartitionedTable(StorageFormat storageFormat) + { + String baseTableName = toLowerCase("test_trino_reads_spark_nested_partitioned_table_" + storageFormat + randomNameSuffix()); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format( + "CREATE TABLE %s (_string STRING, _varbinary BINARY, _bigint BIGINT, _struct STRUCT<_field:INT, _another_field:STRING>)" + + " USING ICEBERG PARTITIONED BY (_struct._field) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = 2)", + sparkTableName, + storageFormat)); + onSpark().executeQuery(format( + "INSERT INTO %s VALUES" + + " ('update', X'0ff102f0feff', 1001, named_struct('_field', 1, '_another_field', 'x'))," + + " ('b', X'0ff102f0fefe', 1002, named_struct('_field', 2, '_another_field', 'y'))," + + " ('c', X'0ff102fdfeff', 1003, named_struct('_field', 3, '_another_field', 'z'))", + sparkTableName)); + + onSpark().executeQuery("UPDATE " + sparkTableName + " SET _string = 'a' WHERE _struct._field = 1"); + assertThatThrownBy(() -> onSpark().executeQuery("DELETE FROM " + sparkTableName + " WHERE _struct._another_field = 'y'")) + .hasMessageContaining("Cannot filter by nested column: 6: _another_field: optional string"); + assertQueryFailure(() -> onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP COLUMN _struct._field")) + .hasMessageContaining("Cannot find source column for partition field: 1000: _struct._field: identity(5)"); + + Row[] expectedRows = new Row[] { + row("a", new byte[] {15, -15, 2, -16, -2, -1}, 1001, 1, "x"), + row("c", new byte[] {15, -15, 2, -3, -2, -1}, 1003, 3, "z") + }; + String select = "SELECT _string, _varbinary, _bigint, _struct._field, _struct._another_field FROM %s" + + " WHERE _struct._field = 1 OR _struct._another_field = 'z'"; + + assertThat(onTrino().executeQuery(format(select, trinoTableName))) + .containsOnly(expectedRows); + assertThat(onSpark().executeQuery(format(select, sparkTableName))) + .containsOnly(expectedRows); + + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") + public void testSparkReadsTrinoNestedPartitionedTableWithOneFieldStruct(StorageFormat storageFormat) + { + String baseTableName = toLowerCase("test_spark_reads_trino_nested_partitioned_table_with_one_field_struct_" + storageFormat + randomNameSuffix()); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onTrino().executeQuery(format( + "CREATE TABLE %s (_string VARCHAR, _bigint BIGINT, _struct ROW(_field BIGINT))" + + " WITH (partitioning = ARRAY['\"_struct._field\"'], format = '%s')", + trinoTableName, + storageFormat)); + onTrino().executeQuery(format( + "INSERT INTO %s VALUES" + + " ('a', 1001, ROW(1))," + + " ('b', 1002, ROW(2))," + + " ('c', 1003, ROW(3))", + trinoTableName)); + + Row expectedRow = row("a", 1001, 1); + String select = "SELECT _string, _bigint, _struct._field FROM %s WHERE _string = 'a'"; + + assertThat(onTrino().executeQuery(format(select, trinoTableName))) + .containsOnly(expectedRow); + + if (storageFormat == StorageFormat.ORC) { + // Open iceberg issue https://github.com/apache/iceberg/issues/3139 to read ORC table with nested partition column + assertThatThrownBy(() -> onSpark().executeQuery(format(select, sparkTableName))) + .hasMessageContaining("java.lang.IndexOutOfBoundsException: Index 2 out of bounds for length 2"); + } + else { + assertThat(onSpark().executeQuery(format(select, sparkTableName))) + .containsOnly(expectedRow); + } + + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats") + public void testTrinoReadsSparkNestedPartitionedTableWithOneFieldStruct(StorageFormat storageFormat) + { + String baseTableName = toLowerCase("test_trino_reads_spark_nested_partitioned_table_with_one_field_struct_" + storageFormat + randomNameSuffix()); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format( + "CREATE TABLE %s (_string STRING, _bigint BIGINT, _struct STRUCT<_field:STRING>)" + + " USING ICEBERG PARTITIONED BY (_struct._field) TBLPROPERTIES ('write.format.default'='%s', 'format-version' = 2)", + sparkTableName, + storageFormat)); + onSpark().executeQuery(format( + "INSERT INTO %s VALUES" + + " ('a', 1001, named_struct('_field', 'field1'))," + + " ('b', 1002, named_struct('_field', 'field2'))," + + " ('c', 1003, named_struct('_field', 'field3'))", + sparkTableName)); + + Row expectedRow = row("a", 1001, "field1"); + String selectNested = "SELECT _string, _bigint, _struct._field FROM %s WHERE _struct._field = 'field1'"; + + assertThat(onTrino().executeQuery(format(selectNested, trinoTableName))) + .containsOnly(expectedRow); + + if (storageFormat == StorageFormat.ORC) { + // Open iceberg issue https://github.com/apache/iceberg/issues/3139 to read ORC table with nested partition column + assertThatThrownBy(() -> onSpark().executeQuery(format(selectNested, sparkTableName))) + .hasMessageContaining("java.lang.IndexOutOfBoundsException: Index 2 out of bounds for length 2"); + } + else { + assertThat(onSpark().executeQuery(format(selectNested, sparkTableName))) + .containsOnly(expectedRow); + } + + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") public void testTrinoPartitionedByRealWithNaN(StorageFormat storageFormat) { @@ -484,38 +639,6 @@ private void testSparkPartitionedByNaN(String typeName, StorageFormat storageFor onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) - public void testPartitionedByNestedField() - { - String baseTableName = "test_trino_nested_field_partition_" + randomNameSuffix(); - String trinoTableName = trinoTableName(baseTableName); - String sparkTableName = sparkTableName(baseTableName); - - onSpark().executeQuery(format("" + - "CREATE TABLE %s (" + - " id INT," + - " parent STRUCT)" + - " USING ICEBERG" + - " PARTITIONED BY (parent.nested)" + - " TBLPROPERTIES ('format-version'=2)", - sparkTableName)); - - assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (2, ROW('b'))")) - .hasMessageContaining("Partitioning by nested field is unsupported: parent.nested"); - assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + trinoTableName + " SET id = 2")) - .hasMessageContaining("Partitioning by nested field is unsupported: parent.nested"); - assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM " + trinoTableName)) - .hasMessageContaining("Partitioning by nested field is unsupported: parent.nested"); - assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO " + trinoTableName + " t USING " + trinoTableName + " s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET id = 2")) - .hasMessageContaining("Partitioning by nested field is unsupported: parent.nested"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + trinoTableName + " EXECUTE OPTIMIZE")) - .hasMessageContaining("Partitioning by nested field is unsupported: parent.nested"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + trinoTableName + " DROP COLUMN parent.nested")) - .hasMessageContaining("Cannot drop partition field: parent.nested"); - - onSpark().executeQuery("DROP TABLE " + sparkTableName); - } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion) {