Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,18 @@ WITH (
orc_bloom_filter_fpp = 0.05)
```

The table definition below specifies to use Avro files, partitioning
by `child1` field in `parent` column:

```
CREATE TABLE test_table (
data INTEGER,
parent ROW(child1 DOUBLE, child2 INTEGER))
WITH (
format = 'AVRO',
partitioning = ARRAY['"parent.child1"'])
```

(iceberg-metadata-tables)=
#### Metadata tables

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,32 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.PartitionTransforms.ValueTransform;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import org.apache.iceberg.PartitionSpec;

import java.lang.invoke.MethodHandle;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform;
import static io.trino.spi.block.RowBlock.getRowFieldsFromBlock;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.function.InvocationConvention.simpleConvention;
import static io.trino.spi.type.TypeUtils.NULL_HASH_CODE;
import static java.lang.String.format;
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
import static java.util.Objects.requireNonNull;

public class IcebergBucketFunction
Expand All @@ -59,18 +63,14 @@ public IcebergBucketFunction(

this.bucketCount = bucketCount;

Map<Integer, Integer> fieldIdToInputChannel = new HashMap<>();
for (int i = 0; i < partitioningColumns.size(); i++) {
Integer previous = fieldIdToInputChannel.put(partitioningColumns.get(i).getId(), i);
checkState(previous == null, "Duplicate id %s in %s at %s and %s", partitioningColumns.get(i).getId(), partitioningColumns, i, previous);
}
Map<String, FieldInfo> nameToFieldInfo = buildNameToFieldInfo(partitioningColumns);
partitionColumns = partitionSpec.fields().stream()
.map(field -> {
Integer channel = fieldIdToInputChannel.get(field.sourceId());
checkArgument(channel != null, "partition field not found: %s", field);
Type inputType = partitioningColumns.get(channel).getType();
ColumnTransform transform = getColumnTransform(field, inputType);
return new PartitionColumn(channel, transform.getValueTransform(), transform.getType());
String fieldName = partitionSpec.schema().findColumnName(field.sourceId());
FieldInfo fieldInfo = nameToFieldInfo.get(fieldName);
checkArgument(fieldInfo != null, "partition field not found: %s", field);
ColumnTransform transform = getColumnTransform(field, fieldInfo.type());
return new PartitionColumn(fieldInfo.sourceChannel(), transform.getValueTransform(), transform.getType(), fieldInfo.path());
})
.collect(toImmutableList());
hashCodeInvokers = partitionColumns.stream()
Expand All @@ -79,6 +79,35 @@ public IcebergBucketFunction(
.collect(toImmutableList());
}

private static void addFieldInfo(
Map<String, FieldInfo> nameToFieldInfo,
int sourceChannel,
String fieldName,
List<ColumnIdentity> children,
Type type,
List<Integer> path)
{
if (type instanceof RowType rowType) {
List<RowType.Field> fields = rowType.getFields();
checkArgument(children.size() == fields.size(), format("children size (%s) == fields size (%s) is not equal", children.size(), fields.size()));
for (int i = 0; i < fields.size(); i++) {
ColumnIdentity child = children.get(i);
String qualifiedFieldName = fieldName + "." + child.getName();
path.add(i);
if (fields.get(i).getType() instanceof RowType) {
addFieldInfo(nameToFieldInfo, sourceChannel, qualifiedFieldName, child.getChildren(), fields.get(i).getType(), path);
}
else {
nameToFieldInfo.put(qualifiedFieldName, new FieldInfo(sourceChannel, qualifiedFieldName, fields.get(i).getType(), ImmutableList.copyOf(path)));
}
path.removeLast();
Comment thread
ebyhr marked this conversation as resolved.
Outdated
}
}
else {
nameToFieldInfo.put(fieldName, new FieldInfo(sourceChannel, fieldName, type, ImmutableList.copyOf(path)));
}
}

@Override
public int getBucket(Page page, int position)
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
{
Expand All @@ -87,6 +116,9 @@ public int getBucket(Page page, int position)
for (int i = 0; i < partitionColumns.size(); i++) {
PartitionColumn partitionColumn = partitionColumns.get(i);
Block block = page.getBlock(partitionColumn.getSourceChannel());
for (int index : partitionColumn.getPath()) {
block = getRowFieldsFromBlock(block).get(index);
}
Object value = partitionColumn.getValueTransform().apply(block, position);
long valueHash = hashValue(hashCodeInvokers.get(i), value);
hash = (31 * hash) + valueHash;
Expand All @@ -95,6 +127,22 @@ public int getBucket(Page page, int position)
return (int) ((hash & Long.MAX_VALUE) % bucketCount);
}

private static Map<String, FieldInfo> buildNameToFieldInfo(List<IcebergColumnHandle> partitioningColumns)
{
Map<String, FieldInfo> nameToFieldInfo = new HashMap<>();
for (int channel = 0; channel < partitioningColumns.size(); channel++) {
IcebergColumnHandle partitionColumn = partitioningColumns.get(channel);
addFieldInfo(
nameToFieldInfo,
channel,
partitionColumn.getName(),
partitionColumn.getColumnIdentity().getChildren(),
partitionColumn.getType(),
new LinkedList<>());
}
return nameToFieldInfo;
}

private static long hashValue(MethodHandle method, Object value)
{
if (value == null) {
Expand All @@ -119,12 +167,14 @@ private static class PartitionColumn
private final int sourceChannel;
private final ValueTransform valueTransform;
private final Type resultType;
private final List<Integer> path;

public PartitionColumn(int sourceChannel, ValueTransform valueTransform, Type resultType)
public PartitionColumn(int sourceChannel, ValueTransform valueTransform, Type resultType, List<Integer> path)
{
this.sourceChannel = sourceChannel;
this.valueTransform = requireNonNull(valueTransform, "valueTransform is null");
this.resultType = requireNonNull(resultType, "resultType is null");
this.path = ImmutableList.copyOf(requireNonNull(path, "path is null"));
}

public int getSourceChannel()
Expand All @@ -141,5 +191,20 @@ public ValueTransform getValueTransform()
{
return valueTransform;
}

public List<Integer> getPath()
{
return path;
}
}

private record FieldInfo(int sourceChannel, String name, Type type, List<Integer> path)
{
FieldInfo
{
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");
path = ImmutableList.copyOf(requireNonNull(path, "path is null"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,13 @@
import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshotAfter;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnMetadatas;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getProjectedColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.getTopLevelColumns;
import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
Expand Down Expand Up @@ -614,7 +615,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
DiscretePredicates discretePredicates = null;
if (!partitionSourceIds.isEmpty()) {
// Extract identity partition columns
Map<Integer, IcebergColumnHandle> columns = getColumns(icebergTable.schema(), typeManager).stream()
Map<Integer, IcebergColumnHandle> columns = getProjectedColumns(icebergTable.schema(), typeManager).stream()
.filter(column -> partitionSourceIds.contains(column.getId()))
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));

Expand Down Expand Up @@ -715,7 +716,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
IcebergTableHandle table = checkValidTableHandle(tableHandle);
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (IcebergColumnHandle columnHandle : getColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) {
for (IcebergColumnHandle columnHandle : getTopLevelColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) {
columnHandles.put(columnHandle.getName(), columnHandle);
}
columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle());
Expand Down Expand Up @@ -1041,13 +1042,32 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
return Optional.empty();
}

validateNotPartitionedByNestedField(tableSchema, partitionSpec);
Map<Integer, IcebergColumnHandle> columnById = getColumns(tableSchema, typeManager).stream()
Map<Integer, Integer> indexParents = indexParents(tableSchema.asStruct());
Map<Integer, IcebergColumnHandle> columnById = getProjectedColumns(tableSchema, typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));

List<IcebergColumnHandle> partitioningColumns = partitionSpec.fields().stream()
.sorted(Comparator.comparing(PartitionField::sourceId))
.map(field -> requireNonNull(columnById.get(field.sourceId()), () -> "Cannot find source column for partitioning field " + field))
.map(field -> {
boolean isBaseColumn = !indexParents.containsKey(field.sourceId());
int sourceId;
if (isBaseColumn) {
sourceId = field.sourceId();
}
else {
sourceId = getRootFieldId(indexParents, field.sourceId());
}
Type sourceType = tableSchema.findType(sourceId);
// The source column, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct.
// https://iceberg.apache.org/spec/#partitioning
if (sourceType.isMapType()) {
throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a map");
}
if (sourceType.isListType()) {
throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a array");
}
Comment thread
krvikash marked this conversation as resolved.
Outdated
return requireNonNull(columnById.get(sourceId), () -> "Cannot find source column for partition field " + field);
})
.distinct()
.collect(toImmutableList());
List<String> partitioningColumnNames = partitioningColumns.stream()
Expand All @@ -1062,14 +1082,22 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
return Optional.of(new ConnectorTableLayout(partitioningHandle, partitioningColumnNames, true));
}

private static int getRootFieldId(Map<Integer, Integer> indexParents, int fieldId)
{
int rootFieldId = fieldId;
while (indexParents.containsKey(rootFieldId)) {
rootFieldId = indexParents.get(rootFieldId);
}
return rootFieldId;
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);

Expand All @@ -1084,7 +1112,7 @@ private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name,
transformValues(table.specs(), PartitionSpecParser::toJson),
table.spec().specId(),
getSupportedSortFields(table.schema(), table.sortOrder()),
getColumns(table.schema(), typeManager),
getProjectedColumns(table.schema(), typeManager),
table.location(),
getFileFormat(table),
table.properties(),
Expand Down Expand Up @@ -1330,7 +1358,7 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(
tableHandle.getSnapshotId(),
tableHandle.getTableSchemaJson(),
tableHandle.getPartitionSpecJson().orElseThrow(() -> new VerifyException("Partition spec missing in the table handle")),
getColumns(SchemaParser.fromJson(tableHandle.getTableSchemaJson()), typeManager),
getProjectedColumns(SchemaParser.fromJson(tableHandle.getTableSchemaJson()), typeManager),
icebergTable.sortOrder().fields().stream()
.map(TrinoSortField::fromIceberg)
.collect(toImmutableList()),
Expand Down Expand Up @@ -1431,7 +1459,6 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

int tableFormatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (tableFormatVersion > OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) {
Expand Down Expand Up @@ -1891,7 +1918,6 @@ private static void updatePartitioning(Table icebergTable, Transaction transacti
}
else {
PartitionSpec partitionSpec = parsePartitionFields(schema, partitionColumns);
validateNotPartitionedByNestedField(schema, partitionSpec);
Set<PartitionField> partitionFields = ImmutableSet.copyOf(partitionSpec.fields());
difference(existingPartitionFields, partitionFields).stream()
.map(PartitionField::name)
Expand Down Expand Up @@ -2372,7 +2398,6 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);
validateNotPartitionedByNestedField(icebergTable.schema(), icebergTable.spec());

beginTransaction(icebergTable);

Expand Down Expand Up @@ -2403,16 +2428,6 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta
}
}

public static void validateNotPartitionedByNestedField(Schema schema, PartitionSpec partitionSpec)
{
Map<Integer, Integer> indexParents = indexParents(schema.asStruct());
for (PartitionField field : partitionSpec.fields()) {
if (indexParents.containsKey(field.sourceId())) {
throw new TrinoException(NOT_SUPPORTED, "Partitioning by nested field is unsupported: " + field.name());
}
}
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments, RetryMode retryMode)
{
Table icebergTable = transaction.table();
Expand Down
Loading