Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ Table statistics
^^^^^^^^^^^^^^^^

You can use :doc:`/sql/analyze` statements in Trino to populate the table
statistics in Delta Lake. Number of distinct values (NDV)
statistics in Delta Lake. Data size and number of distinct values (NDV)
Comment thread
findepi marked this conversation as resolved.
Outdated
statistics are supported, while Minimum value, maximum value, and null value
count statistics are not supported. The :doc:`cost-based optimizer
</optimizer/cost-based-optimizations>` then uses these statistics to improve
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -59,6 +60,7 @@
import io.trino.plugin.hive.security.AccessControlMetadata;
import io.trino.spi.NodeManager;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.BeginTableExecuteResult;
import io.trino.spi.connector.CatalogSchemaName;
Expand Down Expand Up @@ -95,11 +97,13 @@
import io.trino.spi.security.RoleGrant;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ColumnStatisticMetadata;
import io.trino.spi.statistics.ColumnStatisticType;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
import io.trino.spi.statistics.TableStatisticsMetadata;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.HyperLogLogType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
Expand Down Expand Up @@ -128,6 +132,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand Down Expand Up @@ -195,6 +200,7 @@
import static io.trino.spi.predicate.ValueSet.ofRanges;
import static io.trino.spi.statistics.ColumnStatisticType.MAX_VALUE;
import static io.trino.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES_SUMMARY;
import static io.trino.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
Expand Down Expand Up @@ -242,6 +248,10 @@ public class DeltaLakeMetadata
// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty()));
private static final Set<ColumnStatisticType> SUPPORTED_STATISTICS_TYPE = ImmutableSet.<ColumnStatisticType>builder()
.add(TOTAL_SIZE_IN_BYTES)
.add(NUMBER_OF_DISTINCT_VALUES_SUMMARY)
.build();

private final DeltaLakeMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -1932,8 +1942,14 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
analyzeColumnNames
.map(columnNames -> columnNames.contains(columnMetadata.getName()))
.orElse(true))
.map(columnMetadata -> new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY))
.forEach(columnStatistics::add);
.forEach(columnMetadata -> {
if (!(columnMetadata.getType() instanceof FixedWidthType)) {
if (statistics.isEmpty() || totalSizeStatisticsExists(statistics.get().getColumnStatistics(), columnMetadata.getName())) {
columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), TOTAL_SIZE_IN_BYTES));
}
}
columnStatistics.add(new ColumnStatisticMetadata(columnMetadata.getName(), NUMBER_OF_DISTINCT_VALUES_SUMMARY));
});

// collect max(file modification time) for sake of incremental ANALYZE
columnStatistics.add(new ColumnStatisticMetadata(FILE_MODIFIED_TIME_COLUMN_NAME, MAX_VALUE));
Expand All @@ -1958,6 +1974,11 @@ private static boolean shouldCollectExtendedStatistics(ColumnMetadata columnMeta
return true;
}

private static boolean totalSizeStatisticsExists(Map<String, DeltaLakeColumnStatistics> statistics, String columnName)
{
return statistics.containsKey(columnName) && statistics.get(columnName).getTotalSizeInBytes().isPresent();
}

@Override
public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down Expand Up @@ -2130,26 +2151,53 @@ private static Map<String, DeltaLakeColumnStatistics> toDeltaLakeColumnStatistic
{
// Only statistics for whole table are collected
ComputedStatistics singleStatistics = Iterables.getOnlyElement(computedStatistics);
return createColumnToComputedStatisticsMap(singleStatistics.getColumnStatistics()).entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> createDeltaLakeColumnStatistics(entry.getValue())));
}

return singleStatistics.getColumnStatistics().entrySet().stream()
.filter(not(entry -> entry.getKey().getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)))
.collect(toImmutableMap(
entry -> entry.getKey().getColumnName(),
entry -> {
ColumnStatisticMetadata columnStatisticMetadata = entry.getKey();
if (columnStatisticMetadata.getStatisticType() != NUMBER_OF_DISTINCT_VALUES_SUMMARY) {
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
"Unexpected statistics type " + columnStatisticMetadata.getStatisticType() + " found for column " + columnStatisticMetadata.getColumnName());
}
if (entry.getValue().isNull(0)) {
return DeltaLakeColumnStatistics.create(HyperLogLog.newInstance(4096)); // empty HLL with number of buckets used by $approx_set
}
else {
Slice serializedSummary = (Slice) blockToNativeValue(HyperLogLogType.HYPER_LOG_LOG, entry.getValue());
return DeltaLakeColumnStatistics.create(HyperLogLog.newInstance(serializedSummary));
}
}));
private static Map<String, Map<ColumnStatisticType, Block>> createColumnToComputedStatisticsMap(Map<ColumnStatisticMetadata, Block> computedStatistics)
{
ImmutableTable.Builder<String, ColumnStatisticType, Block> result = ImmutableTable.builder();
computedStatistics.forEach((metadata, block) -> {
if (metadata.getColumnName().equals(FILE_MODIFIED_TIME_COLUMN_NAME)) {
return;
Comment thread
findepi marked this conversation as resolved.
Outdated
}
if (!SUPPORTED_STATISTICS_TYPE.contains(metadata.getStatisticType())) {
throw new TrinoException(
GENERIC_INTERNAL_ERROR,
"Unexpected statistics type " + metadata.getStatisticType() + " found for column " + metadata.getColumnName());
}

result.put(metadata.getColumnName(), metadata.getStatisticType(), block);
});
return result.buildOrThrow().rowMap();
}

private static DeltaLakeColumnStatistics createDeltaLakeColumnStatistics(Map<ColumnStatisticType, Block> computedStatistics)
{
OptionalLong totalSize = OptionalLong.empty();
if (computedStatistics.containsKey(TOTAL_SIZE_IN_BYTES)) {
totalSize = getLongValue(computedStatistics.get(TOTAL_SIZE_IN_BYTES));
}
HyperLogLog ndvSummary = getHyperLogLogForNdv(computedStatistics.get(NUMBER_OF_DISTINCT_VALUES_SUMMARY));
return DeltaLakeColumnStatistics.create(totalSize, ndvSummary);
}

private static OptionalLong getLongValue(Block block)
{
if (block.isNull(0)) {
return OptionalLong.of(0);
}
return OptionalLong.of(BIGINT.getLong(block, 0));
}

private static HyperLogLog getHyperLogLogForNdv(Block block)
{
if (block.isNull(0)) {
return HyperLogLog.newInstance(4096); // number of buckets used by $approx_set
}
Slice serializedSummary = (Slice) blockToNativeValue(HyperLogLogType.HYPER_LOG_LOG, block);
return HyperLogLog.newInstance(serializedSummary);
}

private static Optional<Instant> getMaxFileModificationTime(Collection<ComputedStatistics> computedStatistics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ else if (isValidInRange(minValue)) {
if (statistics.isPresent()) {
DeltaLakeColumnStatistics deltaLakeColumnStatistics = statistics.get().getColumnStatistics().get(column.getName());
if (deltaLakeColumnStatistics != null && column.getColumnType() != PARTITION_KEY) {
deltaLakeColumnStatistics.getTotalSizeInBytes().ifPresent(size -> columnStatsBuilder.setDataSize(Estimate.of(size)));
columnStatsBuilder.setDistinctValuesCount(Estimate.of(deltaLakeColumnStatistics.getNdvSummary().cardinality()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,43 @@
import io.airlift.stats.cardinality.HyperLogLog;

import java.util.Base64;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

public class DeltaLakeColumnStatistics
{
private final OptionalLong totalSizeInBytes;
private final HyperLogLog ndvSummary;

@JsonCreator
public static DeltaLakeColumnStatistics create(
@JsonProperty("totalSizeInBytes") OptionalLong totalSizeInBytes,
@JsonProperty("ndvSummary") String ndvSummaryBase64)
{
requireNonNull(totalSizeInBytes, "totalSizeInBytes is null");
requireNonNull(ndvSummaryBase64, "ndvSummaryBase64 is null");
byte[] ndvSummaryBytes = Base64.getDecoder().decode(ndvSummaryBase64);
return new DeltaLakeColumnStatistics(HyperLogLog.newInstance(Slices.wrappedBuffer(ndvSummaryBytes)));
return new DeltaLakeColumnStatistics(totalSizeInBytes, HyperLogLog.newInstance(Slices.wrappedBuffer(ndvSummaryBytes)));
}

public static DeltaLakeColumnStatistics create(HyperLogLog ndvSummary)
public static DeltaLakeColumnStatistics create(OptionalLong totalSizeInBytes, HyperLogLog ndvSummary)
{
return new DeltaLakeColumnStatistics(ndvSummary);
return new DeltaLakeColumnStatistics(totalSizeInBytes, ndvSummary);
}

private DeltaLakeColumnStatistics(HyperLogLog ndvSummary)
private DeltaLakeColumnStatistics(OptionalLong totalSizeInBytes, HyperLogLog ndvSummary)
{
this.totalSizeInBytes = requireNonNull(totalSizeInBytes, "totalSizeInBytes is null");
this.ndvSummary = requireNonNull(ndvSummary, "ndvSummary is null");
}

@JsonProperty
public OptionalLong getTotalSizeInBytes()
{
return totalSizeInBytes;
}

@JsonProperty("ndvSummary")
public String getNdvSummaryBase64()
{
Expand All @@ -58,8 +69,17 @@ public HyperLogLog getNdvSummary()

public DeltaLakeColumnStatistics update(DeltaLakeColumnStatistics newStatistics)
{
OptionalLong totalSizeInBytes = mergeIntegerStatistics(this.totalSizeInBytes, newStatistics.totalSizeInBytes);
HyperLogLog ndvSummary = HyperLogLog.newInstance(this.ndvSummary.serialize());
ndvSummary.mergeWith(newStatistics.ndvSummary);
return new DeltaLakeColumnStatistics(ndvSummary);
return new DeltaLakeColumnStatistics(totalSizeInBytes, ndvSummary);
}

private static OptionalLong mergeIntegerStatistics(OptionalLong first, OptionalLong second)
{
if (first.isPresent() && second.isPresent()) {
return OptionalLong.of(first.getAsLong() + second.getAsLong());
}
return OptionalLong.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1155,8 +1155,8 @@ public void testAnalyze()
"VALUES " +
"('nationkey', null, 25.0, 0.0, null, 0, 24)," +
"('regionkey', null, 5.0, 0.0, null, 0, 4)," +
"('comment', null, 25.0, 0.0, null, null, null)," +
"('name', null, 25.0, 0.0, null, null, null)," +
"('comment', 1857.0, 25.0, 0.0, null, null, null)," +
"('name', 177.0, 25.0, 0.0, null, null, null)," +
"(null, null, null, null, 25.0, null, null)");
}

Expand Down
Loading