Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,10 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}

/** Instantiates an action to compute table stats. */
default ComputeTableStats computeTableStats(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement computeTableStats");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import org.apache.iceberg.StatisticsFile;

/** An action that collects statistics of an Iceberg table and writes to Puffin files. */
public interface ComputeTableStats extends Action<ComputeTableStats, ComputeTableStats.Result> {
/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline

* Choose the set of columns to collect stats, by default all columns are chosen.
*
* @param columns a set of column names to be analyzed
* @return this for method chaining
*/
ComputeTableStats columns(String... columns);

/**
* Choose the table snapshot to compute stats, by default the current snapshot is used.
*
* @param snapshotId long ID of the snapshot for which stats need to be computed
* @return this for method chaining
*/
ComputeTableStats snapshot(long snapshotId);

/** The result of table statistics collection. */
interface Result {

/** Returns statistics file or none if no statistics were collected. */
StatisticsFile statisticsFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,13 @@ public static BlobMetadata from(org.apache.iceberg.puffin.BlobMetadata puffinMet
puffinMetadata.properties());
}

public static List<BlobMetadata> from(
Collection<org.apache.iceberg.puffin.BlobMetadata> puffinMetadataList) {
return puffinMetadataList.stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList());
}

private final String type;
private final long sourceSnapshotId;
private final long sourceSnapshotSequenceNumber;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import javax.annotation.Nullable;
import org.apache.iceberg.StatisticsFile;
import org.immutables.value.Value;

@Value.Enclosing
@SuppressWarnings("ImmutablesStyle")
@Value.Style(
typeImmutableEnclosing = "ImmutableComputeTableStats",
visibilityString = "PUBLIC",
builderVisibilityString = "PUBLIC")
interface BaseComputeTableStats extends ComputeTableStats {

@Value.Immutable
interface Result extends ComputeTableStats.Result {
@Override
@Nullable
StatisticsFile statisticsFile();
}
}
2 changes: 2 additions & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-arrow')
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}")
implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}")
if (scalaVersion == '2.12') {
// scala-collection-compat_2.12 pulls scala 2.12.17 and we need 2.12.18 for JDK 21 support
implementation 'org.scala-lang:scala-library:2.12.18'
Expand Down Expand Up @@ -292,6 +293,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
relocate 'org.apache.datasketches', 'org.apache.iceberg.shaded.org.apache.datasketches'

archiveClassifier.set(null)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.ComputeTableStats;
import org.apache.iceberg.actions.ImmutableComputeTableStats;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Computes the statistics of the given columns and stores it as Puffin files. */
public class ComputeTableStatsSparkAction extends BaseSparkAction<ComputeTableStatsSparkAction>
implements ComputeTableStats {

private static final Logger LOG = LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
private static final Result EMPTY_RESULT = ImmutableComputeTableStats.Result.builder().build();

private final Table table;
private List<String> columns;
private Snapshot snapshot;

ComputeTableStatsSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.snapshot = table.currentSnapshot();
}

@Override
protected ComputeTableStatsSparkAction self() {
return this;
}

@Override
public ComputeTableStats columns(String... newColumns) {
Preconditions.checkArgument(
newColumns != null && newColumns.length > 0, "Columns cannot be null/empty");
this.columns = ImmutableList.copyOf(ImmutableSet.copyOf(newColumns));
return this;
}

@Override
public ComputeTableStats snapshot(long newSnapshotId) {
Snapshot newSnapshot = table.snapshot(newSnapshotId);
Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId);
this.snapshot = newSnapshot;
return this;
}

@Override
public Result execute() {
if (snapshot == null) {
LOG.info("No snapshot to compute stats for table {}", table.name());
return EMPTY_RESULT;
}
validateColumns();
JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
LOG.info(
"Computing stats for columns {} in {} (snapshot {})",
columns(),
table.name(),
snapshotId());
List<Blob> blobs = generateNDVBlobs();
StatisticsFile statisticsFile = writeStatsFile(blobs);
table.updateStatistics().setStatistics(snapshotId(), statisticsFile).commit();
return ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
}

private StatisticsFile writeStatsFile(List<Blob> blobs) {
LOG.info("Writing stats for table {} for snapshot {}", table.name(), snapshotId());
OutputFile outputFile = table.io().newOutputFile(outputPath());
try (PuffinWriter writer = Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
blobs.forEach(writer::add);
writer.finish();
return new GenericStatisticsFile(
snapshotId(),
outputFile.location(),
writer.fileSize(),
writer.footerSize(),
GenericBlobMetadata.from(writer.writtenBlobsMetadata()));
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}

private List<Blob> generateNDVBlobs() {
return NDVSketchUtil.generateBlobs(spark(), table, snapshot, columns());
}

private List<String> columns() {
if (columns == null) {
Schema schema = table.schemas().get(snapshot.schemaId());
this.columns =
schema.columns().stream()
.filter(nestedField -> nestedField.type().isPrimitiveType())
.map(Types.NestedField::name)
.collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be enough as we have to dive into nested fields and collect all leaf columns. That said, I am OK addressing this in a follow-up PR to limit the scope and get this in. At least, it will no longer fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the ndv stats of nested leaf field be useful with CBO?
If so I can add it in a follow-up PR.
I tried to follow Trino, whether the columns with. nested schema were skipped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I somehow thought Trino collected stats for all scalar columns. Let me look into this after.

}
return columns;
}

private void validateColumns() {
Schema schema = table.schemas().get(snapshot.schemaId());
Preconditions.checkArgument(!columns().isEmpty(), "No columns found to compute stats");
for (String columnName : columns()) {
Types.NestedField field = schema.findField(columnName);
Preconditions.checkArgument(field != null, "Can't find column %s in %s", columnName, schema);
Preconditions.checkArgument(
field.type().isPrimitiveType(),
"Can't compute stats on non-primitive type column: %s (%s)",
columnName,
field.type());
}
}

private String appIdentifier() {
String icebergVersion = IcebergBuild.fullVersion();
String sparkVersion = spark().version();
return String.format("Iceberg %s Spark %s", icebergVersion, sparkVersion);
}

private long snapshotId() {
return snapshot.snapshotId();
}

private String jobDesc() {
return String.format(
"Computing table stats for %s (snapshot_id=%s, columns=%s)",
table.name(), snapshotId(), columns());
}

private String outputPath() {
TableOperations operations = ((HasTableOperations) table).operations();
String fileName = String.format("%s-%s.stats", snapshotId(), UUID.randomUUID());
return operations.metadataFileLocation(fileName);
}
}
Loading