diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java index 12e98ffb5e5e..f70f5f057c5e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsReader.java @@ -14,11 +14,11 @@ package io.trino.plugin.iceberg; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.AbstractIterator; import com.google.common.collect.AbstractSequentialIterator; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import io.airlift.log.Logger; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.TupleDomain; @@ -44,12 +44,12 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; @@ -58,7 +58,9 @@ import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.collect.Streams.stream; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.spi.type.VarbinaryType.VARBINARY; @@ -230,10 +232,7 @@ private static Map readNdvs(Table icebergTable, long snapshotId, ImmutableMap.Builder ndvByColumnId = ImmutableMap.builder(); Set remainingColumnIds = new HashSet<>(columnIds); - Iterator statisticsFiles = walkStatisticsFiles(icebergTable, snapshotId); - while (!remainingColumnIds.isEmpty() && statisticsFiles.hasNext()) { - StatisticsFile statisticsFile = statisticsFiles.next(); - + getLatestStatisticsFile(icebergTable, snapshotId).ifPresent(statisticsFile -> { Map thetaBlobsByFieldId = statisticsFile.blobMetadata().stream() .filter(blobMetadata -> blobMetadata.type().equals(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)) .filter(blobMetadata -> blobMetadata.fields().size() == 1) @@ -254,7 +253,7 @@ private static Map readNdvs(Table icebergTable, long snapshotId, ndvByColumnId.put(fieldId, parseLong(ndv)); } } - } + }); // TODO (https://github.com/trinodb/trino/issues/15397): remove support for Trino-specific statistics properties Iterator> properties = icebergTable.properties().entrySet().iterator(); @@ -278,41 +277,29 @@ private static Map readNdvs(Table icebergTable, long snapshotId, } /** - * Iterates over existing statistics files present for parent snapshot chain, starting at {@code startingSnapshotId} (inclusive). + * Returns most recent statistics file for the given {@code snapshotId} */ - public static Iterator walkStatisticsFiles(Table icebergTable, long startingSnapshotId) + public static Optional getLatestStatisticsFile(Table icebergTable, long snapshotId) { - return new AbstractIterator<>() - { - private final Map statsFileBySnapshot = icebergTable.statisticsFiles().stream() - .collect(toMap( - StatisticsFile::snapshotId, - identity(), - (a, b) -> { - throw new IllegalStateException("Unexpected duplicate statistics files %s, %s".formatted(a, b)); - }, - HashMap::new)); - - private final Iterator snapshots = walkSnapshots(icebergTable, startingSnapshotId); + if (icebergTable.statisticsFiles().isEmpty()) { + return Optional.empty(); + } - @Override - protected StatisticsFile computeNext() - { - if (statsFileBySnapshot.isEmpty()) { - // Already found all statistics files - return endOfData(); - } + Map statsFileBySnapshot = icebergTable.statisticsFiles().stream() + .collect(toMap( + StatisticsFile::snapshotId, + identity(), + (file1, file2) -> { + throw new TrinoException( + ICEBERG_INVALID_METADATA, + "Table '%s' has duplicate statistics files '%s' and '%s' for snapshot ID %s" + .formatted(icebergTable, file1.path(), file2.path(), file1.snapshotId())); + })); - while (snapshots.hasNext()) { - long snapshotId = snapshots.next(); - StatisticsFile statisticsFile = statsFileBySnapshot.remove(snapshotId); - if (statisticsFile != null) { - return statisticsFile; - } - } - return endOfData(); - } - }; + return stream(walkSnapshots(icebergTable, snapshotId)) + .map(statsFileBySnapshot::get) + .filter(Objects::nonNull) + .findFirst(); } /** diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsWriter.java index 9b9303b65445..cb12451ef77b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableStatisticsWriter.java @@ -50,8 +50,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -61,11 +59,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; -import static com.google.common.collect.MoreCollectors.toOptional; import static com.google.common.collect.Streams.stream; import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.iceberg.TableStatisticsReader.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY; -import static io.trino.plugin.iceberg.TableStatisticsReader.walkStatisticsFiles; +import static io.trino.plugin.iceberg.TableStatisticsReader.getLatestStatisticsFile; import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.INCREMENTAL_UPDATE; import static io.trino.plugin.iceberg.TableStatisticsWriter.StatsUpdateMode.REPLACE; import static java.lang.String.format; @@ -137,9 +134,7 @@ public StatisticsFile writeStatisticsFile( try (PuffinWriter writer = Puffin.write(outputFile) .createdBy("Trino version " + trinoVersion) .build()) { - table.statisticsFiles().stream() - .filter(statisticsFile -> statisticsFile.snapshotId() == snapshotId) - .collect(toOptional()) + getLatestStatisticsFile(table, snapshotId) .ifPresent(previousStatisticsFile -> copyRetainedStatistics(fileIO, previousStatisticsFile, validFieldIds, ndvSketches.keySet(), writer)); ndvSketches.entrySet().stream() @@ -198,18 +193,16 @@ private CollectedStatistics mergeStatisticsIfNecessary( return switch (updateMode) { case REPLACE -> collectedStatistics; case INCREMENTAL_UPDATE -> { - Map collectedNdvSketches = collectedStatistics.ndvSketches(); + Optional latestStatisticsFile = getLatestStatisticsFile(table, snapshotId); ImmutableMap.Builder ndvSketches = ImmutableMap.builder(); - - Set pendingPreviousNdvSketches = new HashSet<>(collectedNdvSketches.keySet()); - Iterator statisticsFiles = walkStatisticsFiles(table, snapshotId); - while (!pendingPreviousNdvSketches.isEmpty() && statisticsFiles.hasNext()) { - StatisticsFile statisticsFile = statisticsFiles.next(); - + if (latestStatisticsFile.isPresent()) { + Map collectedNdvSketches = collectedStatistics.ndvSketches(); + Set columnsWithRecentlyComputedStats = collectedNdvSketches.keySet(); + StatisticsFile statisticsFile = latestStatisticsFile.get(); boolean hasUsefulData = statisticsFile.blobMetadata().stream() .filter(blobMetadata -> blobMetadata.type().equals(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)) .filter(blobMetadata -> blobMetadata.fields().size() == 1) - .anyMatch(blobMetadata -> pendingPreviousNdvSketches.contains(getOnlyElement(blobMetadata.fields()))); + .anyMatch(blobMetadata -> columnsWithRecentlyComputedStats.contains(getOnlyElement(blobMetadata.fields()))); if (hasUsefulData) { try (PuffinReader reader = Puffin.read(fileIO.newInputFile(statisticsFile.path())) @@ -219,11 +212,10 @@ private CollectedStatistics mergeStatisticsIfNecessary( List toRead = reader.fileMetadata().blobs().stream() .filter(blobMetadata -> blobMetadata.type().equals(APACHE_DATASKETCHES_THETA_V1)) .filter(blobMetadata -> blobMetadata.inputFields().size() == 1) - .filter(blobMetadata -> pendingPreviousNdvSketches.contains(getOnlyElement(blobMetadata.inputFields()))) + .filter(blobMetadata -> columnsWithRecentlyComputedStats.contains(getOnlyElement(blobMetadata.inputFields()))) .collect(toImmutableList()); for (Pair read : reader.readAll(toRead)) { Integer fieldId = getOnlyElement(read.first().inputFields()); - checkState(pendingPreviousNdvSketches.remove(fieldId), "Unwanted read of stats for field %s", fieldId); Memory memory = Memory.wrap(ByteBuffers.getBytes(read.second())); // Memory.wrap(ByteBuffer) results in a different deserialized state CompactSketch previousSketch = CompactSketch.wrap(memory); CompactSketch newSketch = requireNonNull(collectedNdvSketches.get(fieldId), "ndvSketches.get(fieldId) is null"); @@ -235,7 +227,6 @@ private CollectedStatistics mergeStatisticsIfNecessary( } } } - yield new CollectedStatistics(ndvSketches.buildOrThrow()); } };