Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.PartitionTransforms.ValueTransform;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.connector.BucketFunction;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;

import java.lang.invoke.MethodHandle;
import java.util.ArrayList;
import java.util.List;
import java.util.function.ToIntFunction;

Expand All @@ -32,6 +35,7 @@
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.function.InvocationConvention.simpleConvention;
import static io.trino.spi.type.TypeUtils.NULL_HASH_CODE;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;

Expand All @@ -40,6 +44,7 @@ public class IcebergBucketFunction
{
private final int bucketCount;
private final List<HashFunction> functions;
private final List<Integer> partitionStructFields;

private final boolean singleBucketFunction;

Expand All @@ -54,6 +59,7 @@ public IcebergBucketFunction(IcebergPartitioningHandle partitioningHandle, TypeO
this.functions = partitionFunctions.stream()
.map(partitionFunction -> HashFunction.create(partitionFunction, typeOperators))
.collect(toImmutableList());
this.partitionStructFields = ImmutableList.copyOf(partitioningHandle.partitionStructFields());

this.singleBucketFunction = partitionFunctions.size() == 1 &&
partitionFunctions.getFirst().transform() == BUCKET &&
Expand Down Expand Up @@ -81,8 +87,7 @@ public int getBucket(Page page, int position)
@Override
public int applyAsInt(ConnectorSplit split)
{
List<Object> partitionValues = ((IcebergSplit) split).getPartitionValues()
.orElseThrow(() -> new IllegalArgumentException("Split does not contain partition values"));
List<Object> partitionValues = getPartitionValues(((IcebergSplit) split).getPartitionValues());

if (singleBucketFunction) {
long bucket = (long) requireNonNullElse(partitionValues.getFirst(), 0L);
Expand All @@ -99,15 +104,27 @@ public int applyAsInt(ConnectorSplit split)
return (int) ((hash & Long.MAX_VALUE) % bucketCount);
}

private record HashFunction(List<Integer> dataPath, ValueTransform valueTransform, MethodHandle hashCodeOperator)
private List<Object> getPartitionValues(List<Block> partitionBlocks)
{
// using array list because the value could be null
List<Object> partitionValues = new ArrayList<>(partitionStructFields.size());
Comment thread
chenjian2664 marked this conversation as resolved.
for (int i = 0; i < partitionStructFields.size(); i++) {
int fieldIndex = partitionStructFields.get(i);
partitionValues.add(readNativeValue(functions.get(i).resultType(), partitionBlocks.get(fieldIndex), 0));
}
return partitionValues;
}

private record HashFunction(List<Integer> dataPath, ValueTransform valueTransform, MethodHandle hashCodeOperator, Type resultType)
{
private static HashFunction create(IcebergPartitionFunction partitionFunction, TypeOperators typeOperators)
{
PartitionTransforms.ColumnTransform columnTransform = PartitionTransforms.getColumnTransform(partitionFunction);
return new HashFunction(
partitionFunction.dataPath(),
columnTransform.valueTransform(),
typeOperators.getHashCodeOperator(columnTransform.type(), simpleConvention(FAIL_ON_NULL, NEVER_NULL)));
typeOperators.getHashCodeOperator(columnTransform.type(), simpleConvention(FAIL_ON_NULL, NEVER_NULL)),
columnTransform.type());
}

private HashFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ private Optional<IcebergTablePartitioning> getTablePartitioning(ConnectorSession
return Optional.empty();
}
Comment thread
chenjian2664 marked this conversation as resolved.

IcebergPartitioningHandle partitioningHandle = IcebergPartitioningHandle.create(partitionSpec, typeManager, List.of());
IcebergPartitioningHandle partitioningHandle = IcebergPartitioningHandle.create(partitionSpec, typeManager);

Map<Integer, IcebergColumnHandle> partitionColumnById = getPartitionColumns(icebergTable, typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));
Expand All @@ -847,8 +847,7 @@ private Optional<IcebergTablePartitioning> getTablePartitioning(ConnectorSession
return Optional.of(new IcebergTablePartitioning(
false,
partitioningHandle,
partitionColumns,
IntStream.range(0, partitioningHandle.partitionFunctions().size()).boxed().collect(toImmutableList())));
partitionColumns));
}

private static long getSnapshotIdFromVersion(ConnectorSession session, Table table, ConnectorTableVersion version)
Expand Down Expand Up @@ -1074,11 +1073,11 @@ public Optional<ConnectorTableHandle> applyPartitioning(ConnectorSession session
// Change the index of the top level column to the location in the new partitioning columns
newPartitionFunctions.add(function.withTopLevelColumnIndex(newColumnIndex));
// Some partition functions may be dropped so update the struct fields used in split partitioning must be updated
newPartitionStructFields.add(tablePartitioning.partitionStructFields().get(functionIndex));
newPartitionStructFields.add(tablePartitioning.partitioningHandle().partitionStructFields().get(functionIndex));
}
}

IcebergPartitioningHandle newPartitioningHandle = new IcebergPartitioningHandle(false, newPartitionFunctions.build());
IcebergPartitioningHandle newPartitioningHandle = new IcebergPartitioningHandle(false, newPartitionFunctions.build(), newPartitionStructFields.build());
if (partitioningHandle.isPresent() && !partitioningHandle.get().equals(newPartitioningHandle)) {
// todo if bucketing is a power of two, we can adapt the bucketing
return Optional.empty();
Expand All @@ -1092,8 +1091,7 @@ public Optional<ConnectorTableHandle> applyPartitioning(ConnectorSession session
return Optional.of(icebergTableHandle.withTablePartitioning(Optional.of(new IcebergTablePartitioning(
true,
newPartitioningHandle,
partitioningColumns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()),
newPartitionStructFields.build()))));
partitioningColumns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList())))));
}

@Override
Expand Down Expand Up @@ -1588,7 +1586,7 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
// Do not set partitioningHandle, to let engine determine whether to repartition data or not, on stat-based basis.
return Optional.of(new ConnectorTableLayout(partitioningColumnNames));
}
IcebergPartitioningHandle partitioningHandle = IcebergPartitioningHandle.create(partitionSpec, typeManager, List.of());
IcebergPartitioningHandle partitioningHandle = IcebergPartitioningHandle.create(partitionSpec, typeManager);
return Optional.of(new ConnectorTableLayout(partitioningHandle, partitioningColumnNames, true));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public ConnectorPageSource createPageSource(
icebergColumns,
schema,
partitionSpec,
PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes),
PartitionData.fromBlocks(split.getPartitionValues(), partitionColumnTypes, typeManager),
split.getDeletes(),
dynamicFilter,
tableHandle.getUnenforcedPredicate(),
Expand All @@ -294,7 +294,6 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileSize(),
split.getFileRecordCount(),
split.getPartitionDataJson(),
split.getFileFormat(),
getFileIoProperties(connectorTableCredentials),
split.getDataSequenceNumber(),
Expand All @@ -317,7 +316,6 @@ public ConnectorPageSource createPageSource(
long length,
long fileSize,
long fileRecordCount,
String partitionDataJson,
IcebergFileFormat fileFormat,
Map<String, String> fileIoProperties,
long dataSequenceNumber,
Expand Down Expand Up @@ -370,7 +368,7 @@ public ConnectorPageSource createPageSource(
length,
fileSize,
partitionSpec.specId(),
partitionDataJson,
PartitionData.toJson(partitionData),
Comment thread
chenjian2664 marked this conversation as resolved.
fileFormat,
tableSchema,
requiredColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,31 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

public record IcebergPartitioningHandle(boolean update, List<IcebergPartitionFunction> partitionFunctions)
public record IcebergPartitioningHandle(boolean update, List<IcebergPartitionFunction> partitionFunctions, List<Integer> partitionStructFields)
implements ConnectorPartitioningHandle
{
public IcebergPartitioningHandle
{
partitionFunctions = ImmutableList.copyOf(requireNonNull(partitionFunctions, "partitioning is null"));
partitionStructFields = ImmutableList.copyOf(partitionStructFields);
checkArgument(partitionFunctions.size() == partitionStructFields.size(), "partitionFunctions and partitionStructFields must have the same size");
}

public IcebergPartitioningHandle forUpdate()
{
return new IcebergPartitioningHandle(true, partitionFunctions);
return new IcebergPartitioningHandle(true, partitionFunctions, partitionStructFields);
}

public static IcebergPartitioningHandle create(PartitionSpec spec, TypeManager typeManager, List<IcebergColumnHandle> partitioningColumns)
public static IcebergPartitioningHandle create(PartitionSpec spec, TypeManager typeManager)
{
Map<Integer, List<Integer>> dataPaths = buildDataPaths(spec);
List<IcebergPartitionFunction> partitionFields = spec.fields().stream()
Expand All @@ -59,7 +63,7 @@ public static IcebergPartitioningHandle create(PartitionSpec spec, TypeManager t
toTrinoType(spec.schema().findType(field.sourceId()), typeManager)))
.collect(toImmutableList());

return new IcebergPartitioningHandle(false, partitionFields);
return new IcebergPartitioningHandle(false, partitionFields, IntStream.range(0, partitionFields.size()).boxed().collect(toImmutableList()));
Comment thread
raunaqmorarka marked this conversation as resolved.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -46,9 +46,8 @@ public class IcebergSplit
private final long fileSize;
private final long fileRecordCount;
private final IcebergFileFormat fileFormat;
private final Optional<List<Object>> partitionValues;
private final int specId;
private final String partitionDataJson;
private final List<Block> partitionValues;
private final List<DeleteFile> deletes;
private final SplitWeight splitWeight;
private final TupleDomain<IcebergColumnHandle> fileStatisticsDomain;
Expand All @@ -65,7 +64,7 @@ public IcebergSplit(
@JsonProperty("fileRecordCount") long fileRecordCount,
@JsonProperty("fileFormat") IcebergFileFormat fileFormat,
@JsonProperty("specId") int specId,
@JsonProperty("partitionDataJson") String partitionDataJson,
@JsonProperty("partitionValues") List<Block> partitionValues,
@JsonProperty("deletes") List<DeleteFile> deletes,
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("fileStatisticsDomain") TupleDomain<IcebergColumnHandle> fileStatisticsDomain,
Expand All @@ -79,9 +78,8 @@ public IcebergSplit(
fileSize,
fileRecordCount,
fileFormat,
Optional.empty(),
specId,
partitionDataJson,
partitionValues,
deletes,
splitWeight,
fileStatisticsDomain,
Expand All @@ -97,9 +95,8 @@ public IcebergSplit(
long fileSize,
long fileRecordCount,
IcebergFileFormat fileFormat,
Optional<List<Object>> partitionValues,
int specId,
String partitionDataJson,
List<Block> partitionValues,
List<DeleteFile> deletes,
SplitWeight splitWeight,
TupleDomain<IcebergColumnHandle> fileStatisticsDomain,
Expand All @@ -113,9 +110,8 @@ public IcebergSplit(
this.fileSize = fileSize;
this.fileRecordCount = fileRecordCount;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.partitionValues = requireNonNull(partitionValues, "partitionValues is null");
this.specId = specId;
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.partitionValues = ImmutableList.copyOf(partitionValues);
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.fileStatisticsDomain = requireNonNull(fileStatisticsDomain, "fileStatisticsDomain is null");
Expand Down Expand Up @@ -173,20 +169,10 @@ public int getSpecId()
return specId;
}

/**
* Trino (stack) values of the partition columns. The values are the result of evaluating
* the partition expressions on the partition data.
*/
@JsonIgnore
public Optional<List<Object>> getPartitionValues()
{
return partitionValues;
}

@JsonProperty
public String getPartitionDataJson()
public List<Block> getPartitionValues()
{
return partitionDataJson;
return partitionValues;
}

@JsonProperty
Expand Down Expand Up @@ -227,7 +213,7 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(path)
+ SIZE_OF_LONG * 4 // start, length, fileSize, fileRecordCount
+ SIZE_OF_INT // specId
+ estimatedSizeOf(partitionDataJson)
+ estimatedSizeOf(partitionValues, Block::getRetainedSizeInBytes)
+ estimatedSizeOf(deletes, DeleteFile::retainedSizeInBytes)
+ splitWeight.getRetainedSizeInBytes()
+ fileStatisticsDomain.getRetainedSizeInBytes(IcebergColumnHandle::getRetainedSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles;
import io.trino.spi.SplitWeight;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
Expand Down Expand Up @@ -116,8 +117,10 @@
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static java.lang.Math.clamp;
import static java.util.Collections.emptyIterator;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -725,18 +728,6 @@ static boolean partitionMatchesPredicate(
private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain)
{
FileScanTask task = taskWithDomain.fileScanTask();
Optional<List<Object>> partitionValues = Optional.empty();
if (tableHandle.getTablePartitioning().isPresent()) {
PartitionSpec partitionSpec = task.spec();
StructLike partition = task.file().partition();
List<PartitionField> fields = partitionSpec.fields();

partitionValues = Optional.of(tableHandle.getTablePartitioning().get().partitionStructFields().stream()
.map(fieldIndex -> convertIcebergValueToTrino(
partitionSpec.partitionType().field(fields.get(fieldIndex).fieldId()).type(),
partition.get(fieldIndex, Object.class)))
.toList());
}

return new IcebergSplit(
task.file().location(),
Expand All @@ -745,9 +736,8 @@ private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain)
task.file().fileSizeInBytes(),
Comment thread
raunaqmorarka marked this conversation as resolved.
task.file().recordCount(),
IcebergFileFormat.fromIceberg(task.file().format()),
partitionValues,
task.spec().specId(),
PartitionData.toJson(task.file().partition()),
getPartitionBlockValues(task, typeManager),
task.deletes().stream()
.peek(file -> verifyDeletionVectorReferencesDataFile(task, file))
.map(DeleteFile::fromIceberg)
Expand All @@ -759,6 +749,21 @@ private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain)
task.file().firstRowId() == null ? OptionalLong.empty() : OptionalLong.of(task.file().firstRowId()));
}

private static List<Block> getPartitionBlockValues(FileScanTask task, TypeManager typeManager)
{
PartitionSpec spec = task.spec();
StructLike partition = task.file().partition();
List<PartitionField> fields = spec.fields();

ImmutableList.Builder<Block> partitionValues = ImmutableList.builder();
for (int fieldIndex = 0; fieldIndex < fields.size(); fieldIndex++) {
Type icebergType = spec.partitionType().field(fields.get(fieldIndex).fieldId()).type();
Object partitionValue = convertIcebergValueToTrino(icebergType, partition.get(fieldIndex, Object.class));
partitionValues.add(writeNativeValue(toTrinoType(icebergType, typeManager), partitionValue));
}
return partitionValues.build();
}

private static void verifyDeletionVectorReferencesDataFile(FileScanTask task, org.apache.iceberg.DeleteFile deleteFile)
{
if (deleteFile.format() != FileFormat.PUFFIN || deleteFile.contentOffset() == null || deleteFile.contentSizeInBytes() == null) {
Expand Down
Loading