diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergStatistics.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergStatistics.java new file mode 100644 index 000000000000..9c679f2fdfb8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergStatistics.java @@ -0,0 +1,347 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.spi.TrinoException; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import java.lang.invoke.MethodHandle; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; +import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; +import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL; +import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL; +import static io.trino.spi.function.InvocationConvention.simpleConvention; +import static java.util.Objects.requireNonNull; + +@Immutable +final class IcebergStatistics +{ + private final long recordCount; + private final long fileCount; + private final long size; + private final Map minValues; + private final Map maxValues; + private final Map nullCounts; + private final Map columnSizes; + + private IcebergStatistics( + long recordCount, + long fileCount, + long size, + Map minValues, + Map maxValues, + Map nullCounts, + Map columnSizes) + { + this.recordCount = recordCount; + this.fileCount = fileCount; + this.size = size; + this.minValues = ImmutableMap.copyOf(requireNonNull(minValues, "minValues is null")); + this.maxValues = ImmutableMap.copyOf(requireNonNull(maxValues, "maxValues is null")); + this.nullCounts = ImmutableMap.copyOf(requireNonNull(nullCounts, "nullCounts is null")); + this.columnSizes = ImmutableMap.copyOf(requireNonNull(columnSizes, "columnSizes is null")); + } + + public long getRecordCount() + { + return recordCount; + } + + public long getFileCount() + { + return fileCount; + } + + public long getSize() + { + return size; + } + + public Map getMinValues() + { + return minValues; + } + + public Map getMaxValues() + { + return maxValues; + } + + public Map getNullCounts() + { + return nullCounts; + } + + public Map getColumnSizes() + { + return columnSizes; + } + + public static class Builder + { + private final Map idToTypeMapping; + private final List columns; + private final TypeManager typeManager; + private final Map> nullCounts = new HashMap<>(); + private final Map columnStatistics = new HashMap<>(); + private final Map columnSizes = new HashMap<>(); + private final Map fieldIdToTrinoType; + + private long recordCount; + private long fileCount; + private long size; + + public Builder( + Map idToTypeMapping, + List columns, + TypeManager typeManager) + { + this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null")); + this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + + this.fieldIdToTrinoType = columns.stream() + .collect(toImmutableMap(Types.NestedField::fieldId, column -> toTrinoType(column.type(), typeManager))); + } + + public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec) + { + fileCount++; + recordCount += dataFile.recordCount(); + size += dataFile.fileSizeInBytes(); + + Map newColumnSizes = dataFile.columnSizes(); + if (newColumnSizes != null) { + for (Types.NestedField column : columns) { + int id = column.fieldId(); + Long addedSize = newColumnSizes.get(id); + if (addedSize != null) { + columnSizes.merge(id, addedSize, Long::sum); + } + } + } + + Set identityPartitionFieldIds = partitionSpec.fields().stream() + .filter(field -> field.transform().isIdentity()) + .map(PartitionField::sourceId) + .collect(toImmutableSet()); + Map> partitionValues = getPartitionKeys(dataFile.partition(), partitionSpec); + Map lowerBounds = convertBounds(dataFile.lowerBounds()); + Map upperBounds = convertBounds(dataFile.upperBounds()); + for (Types.NestedField column : partitionSpec.schema().columns()) { + int id = column.fieldId(); + io.trino.spi.type.Type trinoType = fieldIdToTrinoType.get(id); + if (identityPartitionFieldIds.contains(id)) { + verify(partitionValues.containsKey(id), "Unable to find value for partition column with field id " + id); + Optional partitionValue = partitionValues.get(id); + if (partitionValue.isPresent()) { + Object trinoValue = deserializePartitionValue(trinoType, partitionValue.get(), column.name()); + // Update min/max stats but there are no null values to count + updateMinMaxStats( + id, + trinoType, + trinoValue, + trinoValue, + Optional.of(0L), + dataFile.recordCount()); + updateNullCountStats(id, Optional.of(0L)); + } + else { + // Update null counts, but do not clear min/max + updateNullCountStats(id, Optional.of(dataFile.recordCount())); + } + } + else { + Object lowerBound = convertIcebergValueToTrino(column.type(), lowerBounds.getOrDefault(id, null)); + Object upperBound = convertIcebergValueToTrino(column.type(), upperBounds.getOrDefault(id, null)); + Optional nullCount = Optional.ofNullable(dataFile.nullValueCounts().get(id)); + updateMinMaxStats( + id, + trinoType, + lowerBound, + upperBound, + nullCount, + dataFile.recordCount()); + updateNullCountStats(id, nullCount); + } + } + } + + public IcebergStatistics build() + { + ImmutableMap.Builder minValues = ImmutableMap.builder(); + ImmutableMap.Builder maxValues = ImmutableMap.builder(); + + columnStatistics.forEach((fieldId, statistics) -> { + statistics.getMin().ifPresent(min -> minValues.put(fieldId, min)); + statistics.getMax().ifPresent(max -> maxValues.put(fieldId, max)); + }); + + Map nullCounts = this.nullCounts.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().orElseThrow())); + + return new IcebergStatistics( + recordCount, + fileCount, + size, + minValues.buildOrThrow(), + maxValues.buildOrThrow(), + nullCounts, + ImmutableMap.copyOf(columnSizes)); + } + + private void updateNullCountStats(int id, Optional nullCount) + { + // If one file is missing nullCounts for a column, invalidate the estimate + nullCounts.merge(id, nullCount, (existingCount, newCount) -> + existingCount.isPresent() && newCount.isPresent() ? Optional.of(existingCount.get() + newCount.get()) : Optional.empty()); + } + + private void updateMinMaxStats( + int id, + io.trino.spi.type.Type type, + @Nullable Object lowerBound, + @Nullable Object upperBound, + Optional nullCount, + long recordCount) + { + // If this column is only nulls for this file, don't update or invalidate min/max statistics + if (type.isOrderable() && (nullCount.isEmpty() || nullCount.get() != recordCount)) { + // Capture the initial bounds during construction so there are always valid min/max values to compare to. This does make the first call to + // `ColumnStatistics#updateMinMax` a no-op. + columnStatistics.computeIfAbsent(id, ignored -> { + MethodHandle comparisonHandle = typeManager.getTypeOperators() + .getComparisonUnorderedLastOperator(type, simpleConvention(FAIL_ON_NULL, NEVER_NULL, NEVER_NULL)); + return new ColumnStatistics(comparisonHandle, lowerBound, upperBound); + }).updateMinMax(lowerBound, upperBound); + } + } + + /** + * Converts a file's column bounds to a Map from field id to Iceberg Object representation + * @param idToMetricMap A Map from field id to Iceberg ByteBuffer representation + * @return A Map from field id to Iceberg Object representation + */ + private Map convertBounds(@Nullable Map idToMetricMap) + { + if (idToMetricMap == null) { + return ImmutableMap.of(); + } + ImmutableMap.Builder map = ImmutableMap.builder(); + idToMetricMap.forEach((id, value) -> { + Type.PrimitiveType type = idToTypeMapping.get(id); + verify(type != null, "No type for column id %s, known types: %s", id, idToTypeMapping); + Object icebergRepresentation = Conversions.fromByteBuffer(type, value); + if (icebergRepresentation != null) { + map.put(id, icebergRepresentation); + } + }); + return map.buildOrThrow(); + } + } + + private static class ColumnStatistics + { + private final MethodHandle comparisonHandle; + + private Optional min; + private Optional max; + + public ColumnStatistics(MethodHandle comparisonHandle, Object initialMin, Object initialMax) + { + this.comparisonHandle = requireNonNull(comparisonHandle, "comparisonHandle is null"); + this.min = Optional.ofNullable(initialMin); + this.max = Optional.ofNullable(initialMax); + } + + /** + * Gets the minimum value accumulated during stats collection. + * @return Empty if the statistics contained values which were not comparable, otherwise returns the min value. + */ + public Optional getMin() + { + return min; + } + + /** + * Gets the maximum value accumulated during stats collection. + * @return Empty if the statistics contained values which were not comparable, otherwise returns the max value. + */ + public Optional getMax() + { + return max; + } + + /** + * @param lowerBound Trino encoded lower bound value from a file + * @param upperBound Trino encoded upper bound value from a file + */ + public void updateMinMax(Object lowerBound, Object upperBound) + { + // Update the stats, as long as they haven't already been invalidated + if (min.isPresent()) { + if (lowerBound == null) { + min = Optional.empty(); + } + else if (compareTrinoValue(lowerBound, min.get()) < 0) { + min = Optional.of(lowerBound); + } + } + + if (max.isPresent()) { + if (upperBound == null) { + max = Optional.empty(); + } + else if (compareTrinoValue(upperBound, max.get()) > 0) { + max = Optional.of(upperBound); + } + } + } + + private long compareTrinoValue(Object value, Object otherValue) + { + try { + return (Long) comparisonHandle.invoke(value, otherValue); + } + catch (Throwable throwable) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unable to compare Iceberg min/max values", throwable); + } + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java index f058ef18a827..56d4c27ca253 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTypes.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg; +import io.airlift.slice.Slices; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; import io.trino.spi.type.UuidType; @@ -75,9 +76,7 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value) return utf8Slice(((String) value)); } if (icebergType instanceof Types.BinaryType) { - // TODO the client sees the bytearray's tostring ouput instead of seeing actual bytes, needs to be fixed. - // TODO return Slice - return ((ByteBuffer) value).array().clone(); + return Slices.wrappedBuffer(((ByteBuffer) value).array().clone()); } if (icebergType instanceof Types.DateType) { return ((Integer) value).longValue(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index b8b227aae848..cc2f19bbffc0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -324,8 +324,11 @@ public static Object deserializePartitionValue(Type type, String valueString, St */ public static Map> getPartitionKeys(FileScanTask scanTask) { - StructLike partition = scanTask.file().partition(); - PartitionSpec spec = scanTask.spec(); + return getPartitionKeys(scanTask.file().partition(), scanTask.spec()); + } + + public static Map> getPartitionKeys(StructLike partition, PartitionSpec spec) + { Map fieldToIndex = getIdentityPartitions(spec); ImmutableMap.Builder> partitionKeys = ImmutableMap.builder(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/Partition.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/Partition.java deleted file mode 100644 index b0f53ee87f93..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/Partition.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.types.Comparators; -import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; - -import java.nio.ByteBuffer; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; - -import static com.google.common.base.Verify.verify; -import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toSet; - -class Partition -{ - private final Map idToTypeMapping; - private final List nonPartitionPrimitiveColumns; - private final StructLike values; - private long recordCount; - private long fileCount; - private long size; - private final Map minValues; - private final Map maxValues; - private final Map nullCounts; - private final Map columnSizes; - private final Set corruptedStats; - private boolean hasValidColumnMetrics; - - public Partition( - Map idToTypeMapping, - List nonPartitionPrimitiveColumns, - StructLike values, - long recordCount, - long size, - Map minValues, - Map maxValues, - Map nullCounts, - Map columnSizes) - { - this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null")); - this.nonPartitionPrimitiveColumns = ImmutableList.copyOf(requireNonNull(nonPartitionPrimitiveColumns, "nonPartitionPrimitiveColumns is null")); - this.values = requireNonNull(values, "values is null"); - this.recordCount = recordCount; - this.fileCount = 1; - this.size = size; - if (minValues == null || maxValues == null || nullCounts == null) { - // This class initialization is asymmetric with respect to first file - // TODO (https://github.com/trinodb/trino/issues/9716) rethink stats collection process to ensure results are correct, and in particular do not depent on ordering - this.minValues = new HashMap<>(); - this.maxValues = new HashMap<>(); - this.nullCounts = new HashMap<>(); - this.columnSizes = new HashMap<>(); - this.corruptedStats = new HashSet<>(); - this.hasValidColumnMetrics = false; - } - else { - this.minValues = new HashMap<>(minValues); - this.maxValues = new HashMap<>(maxValues); - // we are assuming if minValues is not present, max will be not be present either. - this.corruptedStats = nonPartitionPrimitiveColumns.stream() - .map(Types.NestedField::fieldId) - .filter(id -> !minValues.containsKey(id) && (!nullCounts.containsKey(id) || nullCounts.get(id) != recordCount)) - .collect(toSet()); - this.nullCounts = new HashMap<>(nullCounts); - this.columnSizes = columnSizes != null ? new HashMap<>(columnSizes) : null; - hasValidColumnMetrics = true; - } - } - - public Map getIdToTypeMapping() - { - return idToTypeMapping; - } - - public List getNonPartitionPrimitiveColumns() - { - return nonPartitionPrimitiveColumns; - } - - public StructLike getValues() - { - return values; - } - - public long getRecordCount() - { - return recordCount; - } - - public long getFileCount() - { - return fileCount; - } - - public long getSize() - { - return size; - } - - public Map getMinValues() - { - return minValues; - } - - public Map getMaxValues() - { - return maxValues; - } - - public Map getNullCounts() - { - return nullCounts; - } - - public Map getColumnSizes() - { - return columnSizes; - } - - public Set getCorruptedStats() - { - return corruptedStats; - } - - public boolean hasValidColumnMetrics() - { - return hasValidColumnMetrics; - } - - public void incrementRecordCount(long count) - { - this.recordCount += count; - } - - public void incrementFileCount() - { - this.fileCount++; - } - - public void incrementSize(long numberOfBytes) - { - this.size += numberOfBytes; - } - - /** - * The update logic is built with the following rules: - * bounds is null => if any file has a missing bound for a column, that bound will not be reported - * bounds is missing id => not reported in Parquet => that bound will not be reported - * bound value is null => not an expected case - * bound value is present => this is the normal case and bounds will be reported correctly - */ - public void updateMin(Map lowerBounds, Map nullCounts, long recordCount) - { - updateStats(this.minValues, lowerBounds, nullCounts, recordCount, i -> (i > 0)); - } - - public void updateMax(Map upperBounds, Map nullCounts, long recordCount) - { - updateStats(this.maxValues, upperBounds, nullCounts, recordCount, i -> (i < 0)); - } - - public void updateStats(Map current, Map newStat, Map nullCounts, long recordCount, Predicate predicate) - { - if (!hasValidColumnMetrics) { - return; - } - if (newStat == null || nullCounts == null) { - hasValidColumnMetrics = false; - return; - } - for (Types.NestedField column : nonPartitionPrimitiveColumns) { - int id = column.fieldId(); - - if (corruptedStats.contains(id)) { - continue; - } - - Object newValue = newStat.get(id); - // it is expected to not have min/max if all values are null for a column in the datafile and it is not a case of corrupted stats. - if (newValue == null) { - Long nullCount = nullCounts.get(id); - if ((nullCount == null) || (nullCount != recordCount)) { - current.remove(id); - corruptedStats.add(id); - } - continue; - } - - Object oldValue = current.putIfAbsent(id, newValue); - if (oldValue != null) { - Comparator comparator = Comparators.forType(idToTypeMapping.get(id)); - if (predicate.test(comparator.compare(oldValue, newValue))) { - current.put(id, newValue); - } - } - } - } - - public void updateNullCount(Map nullCounts) - { - if (!hasValidColumnMetrics) { - return; - } - if (nullCounts == null) { - hasValidColumnMetrics = false; - return; - } - nullCounts.forEach((key, counts) -> - this.nullCounts.merge(key, counts, Long::sum)); - } - - public static Map convertBounds(Map idToTypeMapping, Map idToMetricMap) - { - if (idToMetricMap == null) { - return null; - } - ImmutableMap.Builder map = ImmutableMap.builder(); - idToMetricMap.forEach((id, value) -> { - Type.PrimitiveType type = idToTypeMapping.get(id); - verify(type != null, "No type for column id %s, known types: %s", id, idToTypeMapping); - map.put(id, Conversions.fromByteBuffer(type, value)); - }); - return map.buildOrThrow(); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java index e03b59656173..9395ecc75707 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java @@ -50,10 +50,10 @@ import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions; import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; -import static io.trino.plugin.iceberg.Partition.convertBounds; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.TypeUtils.writeNativeValue; @@ -179,52 +179,34 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect .useSnapshot(snapshotId.get()) .includeColumnStats(); // TODO make the cursor lazy - return buildRecordCursor(getPartitions(tableScan), icebergTable.spec().fields()); + return buildRecordCursor(getStatisticsByPartition(tableScan), icebergTable.spec().fields()); } - private Map getPartitions(TableScan tableScan) + private Map getStatisticsByPartition(TableScan tableScan) { try (CloseableIterable fileScanTasks = tableScan.planFiles()) { - Map partitions = new HashMap<>(); - + Map partitions = new HashMap<>(); for (FileScanTask fileScanTask : fileScanTasks) { DataFile dataFile = fileScanTask.file(); Types.StructType structType = fileScanTask.spec().partitionType(); StructLike partitionStruct = dataFile.partition(); StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(partitionStruct); - if (!partitions.containsKey(partitionWrapper)) { - Partition partition = new Partition( - idToTypeMapping, - nonPartitionPrimitiveColumns, - partitionStruct, - dataFile.recordCount(), - dataFile.fileSizeInBytes(), - convertBounds(idToTypeMapping, dataFile.lowerBounds()), - convertBounds(idToTypeMapping, dataFile.upperBounds()), - dataFile.nullValueCounts(), - dataFile.columnSizes()); - partitions.put(partitionWrapper, partition); - continue; - } - - Partition partition = partitions.get(partitionWrapper); - partition.incrementFileCount(); - partition.incrementRecordCount(dataFile.recordCount()); - partition.incrementSize(dataFile.fileSizeInBytes()); - partition.updateMin(convertBounds(idToTypeMapping, dataFile.lowerBounds()), dataFile.nullValueCounts(), dataFile.recordCount()); - partition.updateMax(convertBounds(idToTypeMapping, dataFile.upperBounds()), dataFile.nullValueCounts(), dataFile.recordCount()); - partition.updateNullCount(dataFile.nullValueCounts()); + partitions.computeIfAbsent( + partitionWrapper, + ignored -> new IcebergStatistics.Builder(idToTypeMapping, icebergTable.schema().columns(), typeManager)) + .acceptDataFile(dataFile, fileScanTask.spec()); } - return partitions; + return partitions.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().build())); } catch (IOException e) { throw new UncheckedIOException(e); } } - private RecordCursor buildRecordCursor(Map partitions, List partitionFields) + private RecordCursor buildRecordCursor(Map partitionStatistics, List partitionFields) { List partitionTypes = partitionTypes(partitionFields); List> partitionColumnClass = partitionTypes.stream() @@ -233,7 +215,9 @@ private RecordCursor buildRecordCursor(Map partiti ImmutableList.Builder> records = ImmutableList.builder(); - for (Partition partition : partitions.values()) { + for (Map.Entry partitionEntry : partitionStatistics.entrySet()) { + StructLikeWrapper partitionStruct = partitionEntry.getKey(); + IcebergStatistics icebergStatistics = partitionEntry.getValue(); List row = new ArrayList<>(); // add data for partition columns @@ -242,7 +226,7 @@ private RecordCursor buildRecordCursor(Map partiti BlockBuilder partitionBlockBuilder = partitionRowBlockBuilder.beginBlockEntry(); for (int i = 0; i < partitionColumnTypes.size(); i++) { io.trino.spi.type.Type trinoType = partitionColumnType.getFields().get(i).getType(); - Object value = convertIcebergValueToTrino(partitionTypes.get(i), partition.getValues().get(i, partitionColumnClass.get(i))); + Object value = convertIcebergValueToTrino(partitionTypes.get(i), partitionStruct.get().get(i, partitionColumnClass.get(i))); writeNativeValue(trinoType, partitionBlockBuilder, value); } partitionRowBlockBuilder.closeEntry(); @@ -250,26 +234,27 @@ private RecordCursor buildRecordCursor(Map partiti }); // add the top level metrics. - row.add(partition.getRecordCount()); - row.add(partition.getFileCount()); - row.add(partition.getSize()); + row.add(icebergStatistics.getRecordCount()); + row.add(icebergStatistics.getFileCount()); + row.add(icebergStatistics.getSize()); // add column level metrics dataColumnType.ifPresent(dataColumnType -> { - if (!partition.hasValidColumnMetrics()) { - row.add(null); - return; - } BlockBuilder dataRowBlockBuilder = dataColumnType.createBlockBuilder(null, 1); BlockBuilder dataBlockBuilder = dataRowBlockBuilder.beginBlockEntry(); for (int i = 0; i < columnMetricTypes.size(); i++) { Integer fieldId = nonPartitionPrimitiveColumns.get(i).fieldId(); - Type.PrimitiveType type = idToTypeMapping.get(fieldId); - Object min = convertIcebergValueToTrino(type, partition.getMinValues().get(fieldId)); - Object max = convertIcebergValueToTrino(type, partition.getMaxValues().get(fieldId)); - Long nullCount = partition.getNullCounts().get(fieldId); - columnMetricTypes.get(i).writeObject(dataBlockBuilder, getColumnMetricBlock(columnMetricTypes.get(i), min, max, nullCount)); + Object min = icebergStatistics.getMinValues().get(fieldId); + Object max = icebergStatistics.getMaxValues().get(fieldId); + Long nullCount = icebergStatistics.getNullCounts().get(fieldId); + if (min == null && max == null && nullCount == null) { + row.add(null); + return; + } + + RowType columnMetricType = columnMetricTypes.get(i); + columnMetricType.writeObject(dataBlockBuilder, getColumnMetricBlock(columnMetricType, min, max, nullCount)); } dataRowBlockBuilder.closeEntry(); row.add(dataColumnType.getObject(dataRowBlockBuilder, 0)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsMaker.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsMaker.java index bfcf90e0b19a..37dde3236565 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsMaker.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsMaker.java @@ -31,30 +31,22 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; import static com.google.common.base.Verify.verify; -import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; -import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions; import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; -import static io.trino.plugin.iceberg.Partition.convertBounds; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toUnmodifiableMap; public class TableStatisticsMaker @@ -93,14 +85,6 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons Map idToTypeMapping = primitiveFieldTypes(icebergTableSchema); List partitionFields = icebergTable.spec().fields(); - Set identityPartitionIds = getIdentityPartitions(icebergTable.spec()).keySet().stream() - .map(PartitionField::sourceId) - .collect(toSet()); - - List nonPartitionPrimitiveColumns = columns.stream() - .filter(column -> !identityPartitionIds.contains(column.fieldId()) && column.type().isPrimitiveType()) - .collect(toImmutableList()); - List icebergPartitionTypes = partitionTypes(partitionFields, idToTypeMapping); List columnHandles = getColumns(icebergTableSchema, typeManager); Map idToColumnHandle = columnHandles.stream() @@ -110,7 +94,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons for (int index = 0; index < partitionFields.size(); index++) { PartitionField field = partitionFields.get(index); Type type = icebergPartitionTypes.get(index); - idToDetailsBuilder.put(field.sourceId(), new ColumnFieldDetails( + idToDetailsBuilder.put(field.fieldId(), new ColumnFieldDetails( field, idToColumnHandle.get(field.sourceId()), type, @@ -124,7 +108,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons .useSnapshot(tableHandle.getSnapshotId().get()) .includeColumnStats(); - Partition summary = null; + IcebergStatistics.Builder icebergStatisticsBuilder = new IcebergStatistics.Builder(idToTypeMapping, columns, typeManager); try (CloseableIterable fileScanTasks = tableScan.planFiles()) { for (FileScanTask fileScanTask : fileScanTasks) { DataFile dataFile = fileScanTask.file(); @@ -136,35 +120,16 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons continue; } - if (summary == null) { - summary = new Partition( - idToTypeMapping, - nonPartitionPrimitiveColumns, - dataFile.partition(), - dataFile.recordCount(), - dataFile.fileSizeInBytes(), - convertBounds(idToTypeMapping, dataFile.lowerBounds()), - convertBounds(idToTypeMapping, dataFile.upperBounds()), - dataFile.nullValueCounts(), - dataFile.columnSizes()); - } - else { - summary.incrementFileCount(); - summary.incrementRecordCount(dataFile.recordCount()); - summary.incrementSize(dataFile.fileSizeInBytes()); - // TODO (https://github.com/trinodb/trino/issues/9716) for partition fields we should extract values with IcebergUtil#getPartitionKeys - updateSummaryMin(summary, partitionFields, convertBounds(idToTypeMapping, dataFile.lowerBounds()), dataFile.nullValueCounts(), dataFile.recordCount()); - updateSummaryMax(summary, partitionFields, convertBounds(idToTypeMapping, dataFile.upperBounds()), dataFile.nullValueCounts(), dataFile.recordCount()); - summary.updateNullCount(dataFile.nullValueCounts()); - updateColumnSizes(summary, dataFile.columnSizes()); - } + icebergStatisticsBuilder.acceptDataFile(dataFile, fileScanTask.spec()); } } catch (IOException e) { throw new UncheckedIOException(e); } - if (summary == null) { + IcebergStatistics summary = icebergStatisticsBuilder.build(); + + if (summary.getFileCount() == 0) { return TableStatistics.empty(); } @@ -172,7 +137,6 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons double recordCount = summary.getRecordCount(); for (IcebergColumnHandle columnHandle : idToColumnHandle.values()) { int fieldId = columnHandle.getId(); - Type icebergType = icebergTableSchema.findType(fieldId); ColumnStatistics.Builder columnBuilder = new ColumnStatistics.Builder(); Long nullCount = summary.getNullCounts().get(fieldId); if (nullCount != null) { @@ -187,7 +151,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons Object min = summary.getMinValues().get(fieldId); Object max = summary.getMaxValues().get(fieldId); if (min != null && max != null) { - columnBuilder.setRange(DoubleRange.from(columnHandle.getType(), convertIcebergValueToTrino(icebergType, min), convertIcebergValueToTrino(icebergType, max))); + columnBuilder.setRange(DoubleRange.from(columnHandle.getType(), min, max)); } columnHandleBuilder.put(columnHandle, columnBuilder.build()); } @@ -210,7 +174,7 @@ private boolean dataFileMatches( for (int index = 0; index < partitionFields.size(); index++) { PartitionField field = partitionFields.get(index); - int fieldId = field.sourceId(); + int fieldId = field.fieldId(); ColumnFieldDetails details = fieldDetails.get(fieldId); IcebergColumnHandle column = details.getColumnHandle(); Object value = convertIcebergValueToTrino(details.getIcebergType(), dataFile.partition().get(index, details.getJavaClass())); @@ -276,66 +240,4 @@ public Class getJavaClass() return javaClass; } } - - public void updateColumnSizes(Partition summary, Map addedColumnSizes) - { - Map columnSizes = summary.getColumnSizes(); - if (!summary.hasValidColumnMetrics() || columnSizes == null || addedColumnSizes == null) { - return; - } - for (Types.NestedField column : summary.getNonPartitionPrimitiveColumns()) { - int id = column.fieldId(); - - Long addedSize = addedColumnSizes.get(id); - if (addedSize != null) { - columnSizes.put(id, addedSize + columnSizes.getOrDefault(id, 0L)); - } - } - } - - private void updateSummaryMin(Partition summary, List partitionFields, Map lowerBounds, Map nullCounts, long recordCount) - { - summary.updateStats(summary.getMinValues(), lowerBounds, nullCounts, recordCount, i -> (i > 0)); - updatePartitionedStats(summary, partitionFields, summary.getMinValues(), lowerBounds, i -> (i > 0)); - } - - private void updateSummaryMax(Partition summary, List partitionFields, Map upperBounds, Map nullCounts, long recordCount) - { - summary.updateStats(summary.getMaxValues(), upperBounds, nullCounts, recordCount, i -> (i < 0)); - updatePartitionedStats(summary, partitionFields, summary.getMaxValues(), upperBounds, i -> (i < 0)); - } - - private void updatePartitionedStats( - Partition summary, - List partitionFields, - Map current, - Map newStats, - // TODO (https://github.com/trinodb/trino/issues/9716) replace with something like a comparator, or comparator factory - Predicate predicate) - { - if (newStats == null) { - // TODO (https://github.com/trinodb/trino/issues/9716) if some/many files miss statistics, we should probably invalidate statistics collection, see Partition#hasValidColumnMetrics - return; - } - for (PartitionField field : partitionFields) { - int id = field.sourceId(); - if (summary.getCorruptedStats().contains(id)) { - continue; - } - - Object newValue = newStats.get(id); - if (newValue == null) { - // TODO (https://github.com/trinodb/trino/issues/9716) if some/many files miss statistics, we should probably invalidate statistics collection, see Partition#hasValidColumnMetrics - continue; - } - - Object oldValue = current.putIfAbsent(id, newValue); - if (oldValue != null) { - Comparator comparator = Comparators.forType(summary.getIdToTypeMapping().get(id)); - if (predicate.test(comparator.compare(oldValue, newValue))) { - current.put(id, newValue); - } - } - } - } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index c7620657401a..b39284c4669f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -83,7 +83,6 @@ import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; -import static com.google.common.collect.MoreCollectors.toOptional; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext; @@ -733,58 +732,26 @@ public void testCreatePartitionedTable() // SHOW STATS if (format == ORC) { - assertThat(query("SHOW STATS FOR test_partitioned_table")) - .projected(0, 2, 3, 4, 5, 6) // ignore data size which is varying for Parquet (and not available for ORC) - .skippingTypesCheck() - .satisfies(result -> { - // TODO https://github.com/trinodb/trino/issues/9716 stats results are non-deterministic - // once fixed, replace with assertThat(query(...)).matches(...) - MaterializedRow aSampleColumnStatsRow = result.getMaterializedRows().stream() - .filter(row -> "a_boolean".equals(row.getField(0))) - .collect(toOptional()).orElseThrow(); - if (aSampleColumnStatsRow.getField(2) == null) { - assertEqualsIgnoreOrder(result, computeActual("VALUES " + - " ('a_boolean', NULL, NULL, NULL, 'true', 'true'), " + - " ('an_integer', NULL, NULL, NULL, '1', '1'), " + - " ('a_bigint', NULL, NULL, NULL, '1', '1'), " + - " ('a_real', NULL, NULL, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, NULL, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, NULL, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, NULL, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, NULL, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, NULL, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, NULL, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, NULL, '2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'), " + - " ('a_timestamptz', NULL, NULL, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, NULL, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, 2e0, NULL, NULL)")); - } - else { - assertEqualsIgnoreOrder(result, computeActual("VALUES " + - " ('a_boolean', NULL, 0e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, 0e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, 0e0, NULL, '1', '1'), " + - " ('a_real', NULL, 0e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, 0e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, 0e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, 0e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_date', NULL, 0e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, 0e0, NULL, '2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'), " + - " ('a_timestamptz', NULL, 0e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_row', NULL, 0e0, NULL, NULL, NULL), " + - " ('an_array', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_map', NULL, 0e0, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, 2e0, NULL, NULL)")); - } - }); + assertQuery("SHOW STATS FOR test_partitioned_table", + "VALUES " + + " ('a_boolean', NULL, NULL, 0.5, NULL, 'true', 'true'), " + + " ('an_integer', NULL, NULL, 0.5, NULL, '1', '1'), " + + " ('a_bigint', NULL, NULL, 0.5, NULL, '1', '1'), " + + " ('a_real', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, NULL, 0.5, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, NULL, 0.5, NULL, '11.0', '11.0'), " + + " ('a_varchar', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_date', NULL, NULL, 0.5, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, NULL, 0.5, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + + " ('a_timestamptz', NULL, NULL, 0.5, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_row', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('an_array', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " ('a_map', NULL, NULL, 0.5, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, 2e0, NULL, NULL)"); } else { assertThat(query("SHOW STATS FOR test_partitioned_table")) @@ -1057,6 +1024,39 @@ public void testSchemaEvolution() dropTable("test_schema_evolution_drop_middle"); } + @Test + public void testShowStatsAfterAddColumn() + { + assertUpdate("CREATE TABLE test_show_stats_after_add_column (col0 INTEGER, col1 INTEGER, col2 INTEGER)"); + // Insert separately to ensure the table has multiple data files + assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (1, 2, 3)", 1); + assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (4, 5, 6)", 1); + assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (NULL, NULL, NULL)", 1); + assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (7, 8, 9)", 1); + + assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) + .projected(0, 2, 3, 4, 5, 6) // ignore data size which is available for Parquet, but not for ORC + .skippingTypesCheck() + .matches("VALUES " + + " ('col0', NULL, 25e-2, NULL, '1', '7')," + + " ('col1', NULL, 25e-2, NULL, '2', '8'), " + + " ('col2', NULL, 25e-2, NULL, '3', '9'), " + + " (NULL, NULL, NULL, 4e0, NULL, NULL)"); + + // Columns added after some data files exist will not have valid statistics because not all files have min/max/null count statistics for the new column + assertUpdate("ALTER TABLE test_show_stats_after_add_column ADD COLUMN col3 INTEGER"); + assertUpdate("INSERT INTO test_show_stats_after_add_column VALUES (10, 11, 12, 13)", 1); + assertThat(query("SHOW STATS FOR test_show_stats_after_add_column")) + .projected(0, 2, 3, 4, 5, 6) + .skippingTypesCheck() + .matches("VALUES " + + " ('col0', NULL, 2e-1, NULL, '1', '10')," + + " ('col1', NULL, 2e-1, NULL, '2', '11'), " + + " ('col2', NULL, 2e-1, NULL, '3', '12'), " + + " ('col3', NULL, NULL, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, 5e0, NULL, NULL)"); + } + @Test public void testLargeInOnPartitionedColumns() { @@ -2642,84 +2642,28 @@ public void testAllAvailableTypes() .matches(nullValues); // SHOW STATS - if (format == ORC) { - assertThat(query("SHOW STATS FOR test_all_types")) - .projected(0, 2, 3, 4, 5, 6) // ignore data size which is varying for Parquet (and not available for ORC) - .skippingTypesCheck() - .satisfies(result -> { - // TODO https://github.com/trinodb/trino/issues/9716 stats results are non-deterministic - // once fixed, replace with assertThat(query(...)).matches(...) - MaterializedRow aSampleColumnStatsRow = result.getMaterializedRows().stream() - .filter(row -> "a_boolean".equals(row.getField(0))) - .collect(toOptional()).orElseThrow(); - if (aSampleColumnStatsRow.getField(2) == null) { - assertEqualsIgnoreOrder(result, computeActual("VALUES " + - " ('a_boolean', NULL, NULL, NULL, NULL, NULL), " + - " ('an_integer', NULL, NULL, NULL, NULL, NULL), " + - " ('a_bigint', NULL, NULL, NULL, NULL, NULL), " + - " ('a_real', NULL, NULL, NULL, NULL, NULL), " + - " ('a_double', NULL, NULL, NULL, NULL, NULL), " + - " ('a_short_decimal', NULL, NULL, NULL, NULL, NULL), " + - " ('a_long_decimal', NULL, NULL, NULL, NULL, NULL), " + - " ('a_varchar', NULL, NULL, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, NULL, NULL, NULL, NULL), " + - " ('a_date', NULL, NULL, NULL, NULL, NULL), " + - " ('a_time', NULL, NULL, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, NULL, NULL, NULL, NULL), " + - " ('a_timestamptz', NULL, NULL, NULL, NULL, NULL), " + - " ('a_uuid', NULL, NULL, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, 2e0, NULL, NULL)")); - } - else { - assertEqualsIgnoreOrder(result, computeActual("VALUES " + - " ('a_boolean', NULL, 0e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, 0e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, 0e0, NULL, '1', '1'), " + - " ('a_real', NULL, 0e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, 0e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, 0e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, 0e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_date', NULL, 0e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, 0e0, NULL, '2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'), " + - " ('a_timestamptz', NULL, 0e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_row', NULL, 0e0, NULL, NULL, NULL), " + - " ('an_array', NULL, 0e0, NULL, NULL, NULL), " + - " ('a_map', NULL, 0e0, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, 2e0, NULL, NULL)")); - } - }); - } - else { - assertThat(query("SHOW STATS FOR test_all_types")) - .projected(0, 2, 3, 4, 5, 6) // ignore data size which is varying for Parquet (and not available for ORC) - .skippingTypesCheck() - .matches("VALUES " + - " ('a_boolean', NULL, 0.5e0, NULL, 'true', 'true'), " + - " ('an_integer', NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_bigint', NULL, 0.5e0, NULL, '1', '1'), " + - " ('a_real', NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_double', NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_short_decimal', NULL, 0.5e0, NULL, '1.0', '1.0'), " + - " ('a_long_decimal', NULL, 0.5e0, NULL, '11.0', '11.0'), " + - " ('a_varchar', NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_varbinary', NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_date', NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + - " ('a_time', NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_timestamp', NULL, 0.5e0, NULL, '2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'), " + - " ('a_timestamptz', NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + - " ('a_uuid', NULL, 0.5e0, NULL, NULL, NULL), " + - " ('a_row', NULL, NULL, NULL, NULL, NULL), " + - " ('an_array', NULL, NULL, NULL, NULL, NULL), " + - " ('a_map', NULL, NULL, NULL, NULL, NULL), " + - " (NULL, NULL, NULL, 2e0, NULL, NULL)"); - } + assertThat(query("SHOW STATS FOR test_all_types")) + .projected(0, 2, 3, 4, 5, 6) // ignore data size which is varying for Parquet (and not available for ORC) + .skippingTypesCheck() + .matches("VALUES " + + " ('a_boolean', NULL, 0.5e0, NULL, 'true', 'true'), " + + " ('an_integer', NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_bigint', NULL, 0.5e0, NULL, '1', '1'), " + + " ('a_real', NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_double', NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_short_decimal', NULL, 0.5e0, NULL, '1.0', '1.0'), " + + " ('a_long_decimal', NULL, 0.5e0, NULL, '11.0', '11.0'), " + + " ('a_varchar', NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_varbinary', NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_date', NULL, 0.5e0, NULL, '2021-07-24', '2021-07-24'), " + + " ('a_time', NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_timestamp', NULL, 0.5e0, NULL, " + (format == ORC ? "'2021-07-24 03:43:57.987000', '2021-07-24 03:43:57.987999'" : "'2021-07-24 03:43:57.987654', '2021-07-24 03:43:57.987654'") + "), " + + " ('a_timestamptz', NULL, 0.5e0, NULL, '2021-07-24 04:43:57.987 UTC', '2021-07-24 04:43:57.987 UTC'), " + + " ('a_uuid', NULL, 0.5e0, NULL, NULL, NULL), " + + " ('a_row', NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('an_array', NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " ('a_map', NULL, " + (format == ORC ? "0.5" : "NULL") + ", NULL, NULL, NULL), " + + " (NULL, NULL, NULL, 2e0, NULL, NULL)"); // $partitions String schema = getSession().getSchema().orElseThrow(); @@ -2745,26 +2689,7 @@ public void testAllAvailableTypes() " data.a_uuid " + " FROM \"test_all_types$partitions\" ")) .matches( - format == ORC - ? "VALUES (" + - " BIGINT '2', " + - " BIGINT '2', " + - " CAST(NULL AS ROW(min boolean, max boolean, null_count bigint)), " + - " CAST(NULL AS ROW(min integer, max integer, null_count bigint)), " + - " CAST(NULL AS ROW(min bigint, max bigint, null_count bigint)), " + - " CAST(NULL AS ROW(min real, max real, null_count bigint)), " + - " CAST(NULL AS ROW(min double, max double, null_count bigint)), " + - " CAST(NULL AS ROW(min decimal(5,2), max decimal(5,2), null_count bigint)), " + - " CAST(NULL AS ROW(min decimal(38,20), max decimal(38,20), null_count bigint)), " + - " CAST(NULL AS ROW(min varchar, max varchar, null_count bigint)), " + - " CAST(NULL AS ROW(min varbinary, max varbinary, null_count bigint)), " + - " CAST(NULL AS ROW(min date, max date, null_count bigint)), " + - " CAST(NULL AS ROW(min time(6), max time(6), null_count bigint)), " + - " CAST(NULL AS ROW(min timestamp(6), max timestamp(6), null_count bigint)), " + - " CAST(NULL AS ROW(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint)), " + - " CAST(NULL AS ROW(min uuid, max uuid, null_count bigint)) " + - ")" - : "VALUES (" + + "VALUES (" + " BIGINT '2', " + " BIGINT '2', " + " CAST(ROW(true, true, 1) AS ROW(min boolean, max boolean, null_count bigint)), " + @@ -2775,12 +2700,21 @@ public void testAllAvailableTypes() " CAST(ROW(1, 1, 1) AS ROW(min decimal(5,2), max decimal(5,2), null_count bigint)), " + " CAST(ROW(11, 11, 1) AS ROW(min decimal(38,20), max decimal(38,20), null_count bigint)), " + " CAST(ROW('onefsadfdsf', 'onefsadfdsf', 1) AS ROW(min varchar, max varchar, null_count bigint)), " + - " CAST(ROW(X'000102f0feff', X'000102f0feff', 1) AS ROW(min varbinary, max varbinary, null_count bigint)), " + + (format == ORC ? + " CAST(ROW(NULL, NULL, 1) AS ROW(min varbinary, max varbinary, null_count bigint)), " : + " CAST(ROW(X'000102f0feff', X'000102f0feff', 1) AS ROW(min varbinary, max varbinary, null_count bigint)), ") + " CAST(ROW(DATE '2021-07-24', DATE '2021-07-24', 1) AS ROW(min date, max date, null_count bigint)), " + " CAST(ROW(TIME '02:43:57.987654', TIME '02:43:57.987654', 1) AS ROW(min time(6), max time(6), null_count bigint)), " + - " CAST(ROW(TIMESTAMP '2021-07-24 03:43:57.987654', TIMESTAMP '2021-07-24 03:43:57.987654', 1) AS ROW(min timestamp(6), max timestamp(6), null_count bigint)), " + - " CAST(ROW(TIMESTAMP '2021-07-24 04:43:57.987654 UTC', TIMESTAMP '2021-07-24 04:43:57.987654 UTC', 1) AS ROW(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint)), " + - " CAST(ROW(UUID '20050910-1330-11e9-ffff-2a86e4085a59', UUID '20050910-1330-11e9-ffff-2a86e4085a59', 1) AS ROW(min uuid, max uuid, null_count bigint)) " + + (format == ORC ? + " CAST(ROW(TIMESTAMP '2021-07-24 03:43:57.987000', TIMESTAMP '2021-07-24 03:43:57.987999', 1) AS ROW(min timestamp(6), max timestamp(6), null_count bigint)), " : + " CAST(ROW(TIMESTAMP '2021-07-24 03:43:57.987654', TIMESTAMP '2021-07-24 03:43:57.987654', 1) AS ROW(min timestamp(6), max timestamp(6), null_count bigint)), ") + + (format == ORC ? + " CAST(ROW(TIMESTAMP '2021-07-24 04:43:57.987000 UTC', TIMESTAMP '2021-07-24 04:43:57.987999 UTC', 1) AS ROW(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint)), " : + " CAST(ROW(TIMESTAMP '2021-07-24 04:43:57.987654 UTC', TIMESTAMP '2021-07-24 04:43:57.987654 UTC', 1) AS ROW(min timestamp(6) with time zone, max timestamp(6) with time zone, null_count bigint)), ") + + (format == ORC ? + " CAST(ROW(NULL, NULL, 1) AS ROW(min uuid, max uuid, null_count bigint)) " : + " CAST(ROW(UUID '20050910-1330-11e9-ffff-2a86e4085a59', UUID '20050910-1330-11e9-ffff-2a86e4085a59', 1) AS ROW(min uuid, max uuid, null_count bigint)) " + ) + ")"); assertUpdate("DROP TABLE test_all_types"); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index ca52723ab3bb..25a5e625c838 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1256,6 +1256,52 @@ public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) .containsOnly(row(1, null)); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testPartialStats() + { + String tableName = "test_partial_stats_" + randomTableSuffix(); + String sparkTableName = sparkTableName(tableName); + String trinoTableName = trinoTableName(tableName); + + onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col0 INT, col1 INT)"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2)"); + assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName)) + .containsOnly(row("col0", null, null, 0.0, null, "1", "1"), row("col1", null, null, 0.0, null, "2", "2"), row(null, null, null, null, 1.0, null, null)); + + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " SET TBLPROPERTIES (write.metadata.metrics.column.col1='none')"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 4)"); + assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName)) + .containsOnly(row("col0", null, null, 0.0, null, "1", "3"), row("col1", null, null, null, null, null, null), row(null, null, null, null, 2.0, null, null)); + + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testStatsAfterAddingPartitionField() + { + String tableName = "test_stats_after_adding_partition_field_" + randomTableSuffix(); + String sparkTableName = sparkTableName(tableName); + String trinoTableName = trinoTableName(tableName); + + onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(col0 INT, col1 INT)"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2)"); + assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName)) + .containsOnly(row("col0", null, null, 0.0, null, "1", "1"), row("col1", null, null, 0.0, null, "2", "2"), row(null, null, null, null, 1.0, null, null)); + + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD col1"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (3, 4)"); + assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName)) + .containsOnly(row("col0", null, null, 0.0, null, "1", "3"), row("col1", null, null, 0.0, null, "2", "4"), row(null, null, null, null, 2.0, null, null)); + + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " DROP PARTITION FIELD col1"); + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " ADD PARTITION FIELD bucket(3, col1)"); + onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (5, 6)"); + assertThat(onTrino().executeQuery("SHOW STATS FOR " + trinoTableName)) + .containsOnly(row("col0", null, null, 0.0, null, "1", "5"), row("col1", null, null, 0.0, null, "2", "6"), row(null, null, null, null, 3.0, null, null)); + + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + private static String escapeSparkString(String value) { return value.replace("\\", "\\\\").replace("'", "\\'");