Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.TrinoException;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.function.InvocationConvention.simpleConvention;
import static java.util.Objects.requireNonNull;

@Immutable
final class IcebergStatistics
{
private final long recordCount;
private final long fileCount;
private final long size;
private final Map<Integer, Object> minValues;
private final Map<Integer, Object> maxValues;
private final Map<Integer, Long> nullCounts;
private final Map<Integer, Long> columnSizes;

private IcebergStatistics(
long recordCount,
long fileCount,
long size,
Map<Integer, Object> minValues,
Map<Integer, Object> maxValues,
Map<Integer, Long> nullCounts,
Map<Integer, Long> columnSizes)
{
this.recordCount = recordCount;
this.fileCount = fileCount;
this.size = size;
this.minValues = ImmutableMap.copyOf(requireNonNull(minValues, "minValues is null"));
this.maxValues = ImmutableMap.copyOf(requireNonNull(maxValues, "maxValues is null"));
this.nullCounts = ImmutableMap.copyOf(requireNonNull(nullCounts, "nullCounts is null"));
this.columnSizes = ImmutableMap.copyOf(requireNonNull(columnSizes, "columnSizes is null"));
}

public long getRecordCount()
{
return recordCount;
}

public long getFileCount()
{
return fileCount;
}

public long getSize()
{
return size;
}

public Map<Integer, Object> getMinValues()
{
return minValues;
}

public Map<Integer, Object> getMaxValues()
{
return maxValues;
}

public Map<Integer, Long> getNullCounts()
{
return nullCounts;
}

public Map<Integer, Long> getColumnSizes()
{
return columnSizes;
}

public static class Builder
{
private final Map<Integer, Type.PrimitiveType> idToTypeMapping;
private final List<Types.NestedField> columns;
private final TypeManager typeManager;
private final Map<Integer, Optional<Long>> nullCounts = new HashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add empty line between immutable and mutable state

private final Map<Integer, ColumnStatistics> columnStatistics = new HashMap<>();
private final Map<Integer, Long> columnSizes = new HashMap<>();
private final Map<Integer, io.trino.spi.type.Type> fieldIdToTrinoType;

private long recordCount;
private long fileCount;
private long size;

public Builder(
Map<Integer, Type.PrimitiveType> idToTypeMapping,
List<Types.NestedField> columns,
TypeManager typeManager)
{
this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null"));
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.typeManager = requireNonNull(typeManager, "typeManager is null");

this.fieldIdToTrinoType = columns.stream()
.collect(toImmutableMap(Types.NestedField::fieldId, column -> toTrinoType(column.type(), typeManager)));
}

public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec)
{
fileCount++;
recordCount += dataFile.recordCount();
size += dataFile.fileSizeInBytes();

Map<Integer, Long> newColumnSizes = dataFile.columnSizes();
if (newColumnSizes != null) {
Comment on lines 148 to 149
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can getOrDefault to avoid null check.

for (Types.NestedField column : columns) {
int id = column.fieldId();
Long addedSize = newColumnSizes.get(id);
if (addedSize != null) {
Comment on lines 152 to 153
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can getOrDefault to avoid null check.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or use if (newColumnSizes.containsKey(id)) if there's any possibility that the map could contain nulls.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if a null column size is treated differently from a 0L data size? I'm just thinking that default might have an impact on CBO

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, it’s different (no info vs zero-length)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but, if you don't have addedSize for some files, and you do for some files, you produces too small a value now.
it could be reasonable to average size per row for files where this info is available, and hen multiply by number of rows

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self, didn't do this yet.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columnSizes.merge(id, addedSize, Long::sum);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg report column size for all types, or only variable width?

we could limit the calculations to variable width only, as otherwise the stat is ignored by the engine.

(optional, really_

}
}
}

Set<Integer> identityPartitionFieldIds = partitionSpec.fields().stream()
.filter(field -> field.transform().isIdentity())
.map(PartitionField::sourceId)
.collect(toImmutableSet());
Map<Integer, Optional<String>> partitionValues = getPartitionKeys(dataFile.partition(), partitionSpec);
Map<Integer, Object> lowerBounds = convertBounds(dataFile.lowerBounds());
Map<Integer, Object> upperBounds = convertBounds(dataFile.upperBounds());
for (Types.NestedField column : partitionSpec.schema().columns()) {
int id = column.fieldId();
io.trino.spi.type.Type trinoType = fieldIdToTrinoType.get(id);
if (identityPartitionFieldIds.contains(id)) {
verify(partitionValues.containsKey(id), "Unable to find value for partition column with field id " + id);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what breaks the schema evolution.
I notice that you didn't have this check and deemed OK.

Maybe we want if (identityPartitionFieldIds.contains(id) && partitionValues.containsKey(id)), so that we still try to take min/max from file stats?

Actually, is identityPartitionFieldIds.contains(id) important?
if partitionValues.containsKey(id) should be enough. The current table partitioning is not important when calculating the stats.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, is identityPartitionFieldIds.contains(id) important?

I was using that to proxy checking if the partition has a transform. If the partitioning is on hour(ts) we can't use the partition information to calculate max(ts), but you're right we can't use the current partitioning it needs to be the spec for that file.

Optional<String> partitionValue = partitionValues.get(id);
if (partitionValue.isPresent()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verify partitionValue isn't null and provide meaningful message when it is

Object trinoValue = deserializePartitionValue(trinoType, partitionValue.get(), column.name());
// Update min/max stats but there are no null values to count
updateMinMaxStats(
id,
trinoType,
trinoValue,
trinoValue,
Optional.of(0L),
dataFile.recordCount());
updateNullCountStats(id, Optional.of(0L));
Comment thread
findepi marked this conversation as resolved.
Outdated
}
else {
// Update null counts, but do not clear min/max
updateNullCountStats(id, Optional.of(dataFile.recordCount()));
}
}
else {
Object lowerBound = convertIcebergValueToTrino(column.type(), lowerBounds.getOrDefault(id, null));
Object upperBound = convertIcebergValueToTrino(column.type(), upperBounds.getOrDefault(id, null));
Optional<Long> nullCount = Optional.ofNullable(dataFile.nullValueCounts().get(id));
updateMinMaxStats(
id,
trinoType,
lowerBound,
upperBound,
nullCount,
dataFile.recordCount());
updateNullCountStats(id, nullCount);
}
}
}

public IcebergStatistics build()
{
ImmutableMap.Builder<Integer, Object> minValues = ImmutableMap.builder();
ImmutableMap.Builder<Integer, Object> maxValues = ImmutableMap.builder();

columnStatistics.forEach((fieldId, statistics) -> {
statistics.getMin().ifPresent(min -> minValues.put(fieldId, min));
statistics.getMax().ifPresent(max -> maxValues.put(fieldId, max));
});

Map<Integer, Long> nullCounts = this.nullCounts.entrySet().stream()
.filter(entry -> entry.getValue().isPresent())
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().orElseThrow()));

return new IcebergStatistics(
recordCount,
fileCount,
size,
minValues.buildOrThrow(),
maxValues.buildOrThrow(),
nullCounts,
ImmutableMap.copyOf(columnSizes));
}

private void updateNullCountStats(int id, Optional<Long> nullCount)
{
// If one file is missing nullCounts for a column, invalidate the estimate
nullCounts.merge(id, nullCount, (existingCount, newCount) ->
existingCount.isPresent() && newCount.isPresent() ? Optional.of(existingCount.get() + newCount.get()) : Optional.empty());
}

private void updateMinMaxStats(
int id,
io.trino.spi.type.Type type,
@Nullable Object lowerBound,
@Nullable Object upperBound,
Optional<Long> nullCount,
long recordCount)
{
// If this column is only nulls for this file, don't update or invalidate min/max statistics
if (type.isOrderable() && (nullCount.isEmpty() || nullCount.get() != recordCount)) {
// Capture the initial bounds during construction so there are always valid min/max values to compare to. This does make the first call to
// `ColumnStatistics#updateMinMax` a no-op.
columnStatistics.computeIfAbsent(id, ignored -> {
MethodHandle comparisonHandle = typeManager.getTypeOperators()
.getComparisonUnorderedLastOperator(type, simpleConvention(FAIL_ON_NULL, NEVER_NULL, NEVER_NULL));
return new ColumnStatistics(comparisonHandle, lowerBound, upperBound);
}).updateMinMax(lowerBound, upperBound);
}
}

/**
* Converts a file's column bounds to a Map from field id to Iceberg Object representation
* @param idToMetricMap A Map from field id to Iceberg ByteBuffer representation
* @return A Map from field id to Iceberg Object representation
*/
private Map<Integer, Object> convertBounds(@Nullable Map<Integer, ByteBuffer> idToMetricMap)
{
if (idToMetricMap == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark arg @Nullable

return ImmutableMap.of();
}
ImmutableMap.Builder<Integer, Object> map = ImmutableMap.builder();
idToMetricMap.forEach((id, value) -> {
Type.PrimitiveType type = idToTypeMapping.get(id);
verify(type != null, "No type for column id %s, known types: %s", id, idToTypeMapping);
Object icebergRepresentation = Conversions.fromByteBuffer(type, value);
if (icebergRepresentation != null) {
map.put(id, icebergRepresentation);
}
});
return map.buildOrThrow();
}
}

private static class ColumnStatistics
{
private final MethodHandle comparisonHandle;

private Optional<Object> min;
private Optional<Object> max;

public ColumnStatistics(MethodHandle comparisonHandle, Object initialMin, Object initialMax)
{
this.comparisonHandle = requireNonNull(comparisonHandle, "comparisonHandle is null");
this.min = Optional.ofNullable(initialMin);
this.max = Optional.ofNullable(initialMax);
}

/**
* Gets the minimum value accumulated during stats collection.
* @return Empty if the statistics contained values which were not comparable, otherwise returns the min value.
*/
public Optional<Object> getMin()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the meaning of empty.

{
return min;
}

/**
* Gets the maximum value accumulated during stats collection.
* @return Empty if the statistics contained values which were not comparable, otherwise returns the max value.
*/
public Optional<Object> getMax()
{
return max;
}

/**
* @param lowerBound Trino encoded lower bound value from a file
* @param upperBound Trino encoded upper bound value from a file
*/
public void updateMinMax(Object lowerBound, Object upperBound)
{
// Update the stats, as long as they haven't already been invalidated
if (min.isPresent()) {
if (lowerBound == null) {
min = Optional.empty();
}
else if (compareTrinoValue(lowerBound, min.get()) < 0) {
min = Optional.of(lowerBound);
}
}

if (max.isPresent()) {
if (upperBound == null) {
max = Optional.empty();
}
else if (compareTrinoValue(upperBound, max.get()) > 0) {
max = Optional.of(upperBound);
}
}
}

private long compareTrinoValue(Object value, Object otherValue)
{
try {
return (Long) comparisonHandle.invoke(value, otherValue);
}
catch (Throwable throwable) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unable to compare Iceberg min/max values", throwable);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import io.airlift.slice.Slices;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.UuidType;
Expand Down Expand Up @@ -75,9 +76,7 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value)
return utf8Slice(((String) value));
}
if (icebergType instanceof Types.BinaryType) {
// TODO the client sees the bytearray's tostring ouput instead of seeing actual bytes, needs to be fixed.
// TODO return Slice
return ((ByteBuffer) value).array().clone();
return Slices.wrappedBuffer(((ByteBuffer) value).array().clone());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove // TODO the client sees the bytearray's tostring ouput instead of se too

}
if (icebergType instanceof Types.DateType) {
return ((Integer) value).longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,11 @@ public static Object deserializePartitionValue(Type type, String valueString, St
*/
public static Map<Integer, Optional<String>> getPartitionKeys(FileScanTask scanTask)
{
StructLike partition = scanTask.file().partition();
PartitionSpec spec = scanTask.spec();
return getPartitionKeys(scanTask.file().partition(), scanTask.spec());
}

public static Map<Integer, Optional<String>> getPartitionKeys(StructLike partition, PartitionSpec spec)
{
Map<PartitionField, Integer> fieldToIndex = getIdentityPartitions(spec);
ImmutableMap.Builder<Integer, Optional<String>> partitionKeys = ImmutableMap.builder();

Expand Down
Loading