Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,13 @@ The default value for this property is ``7d``.
drop_extended_stats
~~~~~~~~~~~~~~~~~~~

This is an experimental command to remove extended statistics from the table.
The ``drop_extended_stats`` command removes all extended statistics information from
the table.

``drop_extended_stats`` can be run as follows:

.. code-block:: sql

SET SESSION my_catalog.experimental_extended_statistics_enabled = true;
ALTER TABLE test_table EXECUTE drop_extended_stats

.. _iceberg-alter-table-set-properties:
Expand Down Expand Up @@ -1256,11 +1256,10 @@ the definition and the storage table.
Table statistics
----------------

There is experimental support to collect column statistics which can be enabled by
setting the ``iceberg.experimental.extended-statistics.enabled`` catalog
configuration property or the corresponding
``experimental_extended_statistics_enabled`` session property to ``true``.
Enabling this configuration allows executing :doc:`/sql/analyze` statement to gather statistics.
The Iceberg connector can collect column statistics using :doc:`/sql/analyze`
statement. This can be disabled using ``iceberg.extended-statistics.enabled``
catalog configuration property, or the corresponding
``extended_statistics_enabled`` session property.

.. _iceberg_analyze:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;

@DefunctConfig("iceberg.allow-legacy-snapshot-syntax")
@DefunctConfig({
"iceberg.allow-legacy-snapshot-syntax",
"iceberg.experimental.extended-statistics.enabled",
})
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.experimental.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Allow ANALYZE and use of extended statistics collected by it. Currently, the statistics are collected in Trino-specific format";
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Allow ANALYZE and use of extended statistics collected by it";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "iceberg.expire_snapshots.min-retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "iceberg.remove_orphan_files.min-retention";

Expand All @@ -54,7 +57,7 @@ public class IcebergConfig
private CatalogType catalogType = HIVE_METASTORE;
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean extendedStatisticsEnabled;
private boolean extendedStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
private boolean registerTableProcedureEnabled;
private Optional<String> hiveCatalogName = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateStatistics;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
Expand Down Expand Up @@ -216,9 +218,8 @@
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.TableStatisticsMaker.TRINO_STATS_COLUMN_ID_PATTERN;
import static io.trino.plugin.iceberg.TableStatisticsMaker.TRINO_STATS_NDV_FORMAT;
import static io.trino.plugin.iceberg.TableStatisticsMaker.TRINO_STATS_PREFIX;
import static io.trino.plugin.iceberg.TableStatisticsReader.TRINO_STATS_COLUMN_ID_PATTERN;
import static io.trino.plugin.iceberg.TableStatisticsReader.TRINO_STATS_PREFIX;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
Expand Down Expand Up @@ -271,7 +272,7 @@ public class IcebergMetadata
public static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";

private static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES";
public static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES";
private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME);

private static final Integer DELETE_BATCH_SIZE = 1000;
Expand All @@ -280,6 +281,7 @@ public class IcebergMetadata
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalog catalog;
private final TrinoFileSystemFactory fileSystemFactory;
private final TableStatisticsWriter tableStatisticsWriter;

private final Map<IcebergTableHandle, TableStatistics> tableStatisticsCache = new ConcurrentHashMap<>();

Expand All @@ -289,12 +291,14 @@ public IcebergMetadata(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalog catalog,
TrinoFileSystemFactory fileSystemFactory)
TrinoFileSystemFactory fileSystemFactory,
TableStatisticsWriter tableStatisticsWriter)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalog = requireNonNull(catalog, "catalog is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
}

@Override
Expand Down Expand Up @@ -1193,6 +1197,11 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec

Table icebergTable = catalog.loadTable(session, executeHandle.getSchemaTableName());
beginTransaction(icebergTable);
UpdateStatistics updateStatistics = transaction.updateStatistics();
for (StatisticsFile statisticsFile : icebergTable.statisticsFiles()) {
updateStatistics.removeStatistics(statisticsFile.snapshotId());
}
updateStatistics.commit();
UpdateProperties updateProperties = transaction.updateProperties();
for (String key : transaction.table().properties().keySet()) {
if (key.startsWith(TRINO_STATS_PREFIX)) {
Expand Down Expand Up @@ -1578,9 +1587,10 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession

IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
checkArgument(handle.getTableType() == DATA, "Cannot analyze non-DATA table: %s", handle.getTableType());
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
if (handle.getSnapshotId().isPresent() && (handle.getSnapshotId().get() != icebergTable.currentSnapshot().snapshotId())) {
throw new TrinoException(NOT_SUPPORTED, "Cannot analyze old snapshot %s".formatted(handle.getSnapshotId().get()));

if (handle.getSnapshotId().isEmpty()) {
// No snapshot, table is empty
return new ConnectorAnalyzeMetadata(tableHandle, TableStatisticsMetadata.empty());
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);
Expand Down Expand Up @@ -1627,13 +1637,36 @@ public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session,
@Override
public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
{
UpdateProperties updateProperties = transaction.updateProperties();
Map<String, Integer> columnNameToId = transaction.table().schema().columns().stream()
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table table = transaction.table();
if (handle.getSnapshotId().isEmpty()) {
// No snapshot, table is empty
verify(
computedStatistics.isEmpty(),
"Unexpected computed statistics that cannot be attached to a snapshot because none exists: %s",
computedStatistics);

// TODO (https://github.com/trinodb/trino/issues/15397): remove support for Trino-specific statistics properties
// Drop all stats. Empty table needs none
UpdateProperties updateProperties = transaction.updateProperties();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have two blocks to remove the old properties if they exist. Would be better to have one at the top before the empty snapshotId check. But if we're going to remove this soon anyway, probably doesn't matter

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we're going to remove this soon anyway, probably doesn't matter

exactly

table.properties().keySet().stream()
.filter(key -> key.startsWith(TRINO_STATS_PREFIX))
.forEach(updateProperties::remove);
updateProperties.commit();

transaction.commitTransaction();
transaction = null;
}
long snapshotId = handle.getSnapshotId().orElseThrow();

Map<String, Integer> columnNameToId = table.schema().columns().stream()
.collect(toImmutableMap(nestedField -> nestedField.name().toLowerCase(ENGLISH), Types.NestedField::fieldId));
Set<Integer> columnIds = ImmutableSet.copyOf(columnNameToId.values());

// TODO (https://github.com/trinodb/trino/issues/15397): remove support for Trino-specific statistics properties
// Drop stats for obsolete columns
transaction.table().properties().keySet().stream()
UpdateProperties updateProperties = transaction.updateProperties();
table.properties().keySet().stream()
.filter(key -> {
if (!key.startsWith(TRINO_STATS_PREFIX)) {
return false;
Expand All @@ -1645,7 +1678,9 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
return !columnIds.contains(Integer.parseInt(matcher.group("columnId")));
})
.forEach(updateProperties::remove);
updateProperties.commit();

ImmutableMap.Builder<Integer, CompactSketch> ndvSketches = ImmutableMap.builder();
for (ComputedStatistics computedStatistic : computedStatistics) {
verify(computedStatistic.getGroupingColumns().isEmpty() && computedStatistic.getGroupingValues().isEmpty(), "Unexpected grouping");
verify(computedStatistic.getTableStatistics().isEmpty(), "Unexpected table statistics");
Expand All @@ -1657,16 +1692,23 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
"Column not found in table: [%s]",
statisticMetadata.getColumnName());
CompactSketch sketch = DataSketchStateSerializer.deserialize(entry.getValue(), 0);
// TODO: store whole sketch to support updates, see also https://github.com/apache/iceberg-docs/pull/69
updateProperties.set(TRINO_STATS_NDV_FORMAT.formatted(columnId), Long.toString((long) sketch.getEstimate()));
ndvSketches.put(columnId, sketch);
}
else {
throw new UnsupportedOperationException("Unsupported statistic: " + statisticMetadata);
}
}
}

updateProperties.commit();
StatisticsFile statisticsFile = tableStatisticsWriter.writeStatisticsFile(
session,
table,
snapshotId,
ndvSketches.buildOrThrow());
transaction.updateStatistics()
.setStatistics(snapshotId, statisticsFile)
.commit();

transaction.commitTransaction();
transaction = null;
}
Expand Down Expand Up @@ -2285,7 +2327,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
originalHandle.getMaxScannedFileSize()),
handle -> {
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, session, handle, icebergTable);
return TableStatisticsReader.getTableStatistics(typeManager, session, handle, icebergTable);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,25 @@ public class IcebergMetadataFactory
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalogFactory catalogFactory;
private final TrinoFileSystemFactory fileSystemFactory;
private final TableStatisticsWriter tableStatisticsWriter;

@Inject
public IcebergMetadataFactory(
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalogFactory catalogFactory,
TrinoFileSystemFactory fileSystemFactory)
TrinoFileSystemFactory fileSystemFactory,
TableStatisticsWriter tableStatisticsWriter)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.tableStatisticsWriter = requireNonNull(tableStatisticsWriter, "tableStatisticsWriter is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
{
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity), fileSystemFactory);
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity), fileSystemFactory, tableStatisticsWriter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

binder.bind(TableStatisticsWriter.class).in(Scopes.SINGLETON);
binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON);

jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public final class IcebergSessionProperties
private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String STATISTICS_ENABLED = "statistics_enabled";
public static final String EXTENDED_STATISTICS_ENABLED = "experimental_extended_statistics_enabled";
public static final String EXTENDED_STATISTICS_ENABLED = "extended_statistics_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public TableType getTableType()
return tableType;
}

// Empty only when reading from a table that has no snapshots yet.
@JsonProperty
public Optional<Long> getSnapshotId()
{
Expand Down
Loading