Skip to content

Commit

Permalink
Spark 3.4: Action to compute table stats (#11106)
Browse files Browse the repository at this point in the history
  • Loading branch information
karuppayya committed Sep 13, 2024
1 parent e449d34 commit 5582b0c
Show file tree
Hide file tree
Showing 6 changed files with 816 additions and 0 deletions.
2 changes: 2 additions & 0 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-arrow')
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}")
implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}")
if (scalaVersion == '2.12') {
// scala-collection-compat_2.12 pulls scala 2.12.17 and we need 2.12.18 for JDK 21 support
implementation 'org.scala-lang:scala-library:2.12.18'
Expand Down Expand Up @@ -289,6 +290,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap'
relocate 'org.apache.datasketches', 'org.apache.iceberg.shaded.org.apache.datasketches'

archiveClassifier.set(null)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.actions.ComputeTableStats;
import org.apache.iceberg.actions.ImmutableComputeTableStats;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Computes the statistics of the given columns and stores it as Puffin files. */
public class ComputeTableStatsSparkAction extends BaseSparkAction<ComputeTableStatsSparkAction>
implements ComputeTableStats {

private static final Logger LOG = LoggerFactory.getLogger(ComputeTableStatsSparkAction.class);
private static final Result EMPTY_RESULT = ImmutableComputeTableStats.Result.builder().build();

private final Table table;
private List<String> columns;
private Snapshot snapshot;

ComputeTableStatsSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.snapshot = table.currentSnapshot();
}

@Override
protected ComputeTableStatsSparkAction self() {
return this;
}

@Override
public ComputeTableStats columns(String... newColumns) {
Preconditions.checkArgument(
newColumns != null && newColumns.length > 0, "Columns cannot be null/empty");
this.columns = ImmutableList.copyOf(ImmutableSet.copyOf(newColumns));
return this;
}

@Override
public ComputeTableStats snapshot(long newSnapshotId) {
Snapshot newSnapshot = table.snapshot(newSnapshotId);
Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId);
this.snapshot = newSnapshot;
return this;
}

@Override
public Result execute() {
if (snapshot == null) {
LOG.info("No snapshot to compute stats for table {}", table.name());
return EMPTY_RESULT;
}
validateColumns();
JobGroupInfo info = newJobGroupInfo("COMPUTE-TABLE-STATS", jobDesc());
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
LOG.info(
"Computing stats for columns {} in {} (snapshot {})",
columns(),
table.name(),
snapshotId());
List<Blob> blobs = generateNDVBlobs();
StatisticsFile statisticsFile = writeStatsFile(blobs);
table.updateStatistics().setStatistics(snapshotId(), statisticsFile).commit();
return ImmutableComputeTableStats.Result.builder().statisticsFile(statisticsFile).build();
}

private StatisticsFile writeStatsFile(List<Blob> blobs) {
LOG.info("Writing stats for table {} for snapshot {}", table.name(), snapshotId());
OutputFile outputFile = table.io().newOutputFile(outputPath());
try (PuffinWriter writer = Puffin.write(outputFile).createdBy(appIdentifier()).build()) {
blobs.forEach(writer::add);
writer.finish();
return new GenericStatisticsFile(
snapshotId(),
outputFile.location(),
writer.fileSize(),
writer.footerSize(),
GenericBlobMetadata.from(writer.writtenBlobsMetadata()));
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}

private List<Blob> generateNDVBlobs() {
return NDVSketchUtil.generateBlobs(spark(), table, snapshot, columns());
}

private List<String> columns() {
if (columns == null) {
Schema schema = table.schemas().get(snapshot.schemaId());
this.columns =
schema.columns().stream()
.filter(nestedField -> nestedField.type().isPrimitiveType())
.map(Types.NestedField::name)
.collect(Collectors.toList());
}
return columns;
}

private void validateColumns() {
Schema schema = table.schemas().get(snapshot.schemaId());
Preconditions.checkArgument(!columns().isEmpty(), "No columns found to compute stats");
for (String columnName : columns()) {
Types.NestedField field = schema.findField(columnName);
Preconditions.checkArgument(field != null, "Can't find column %s in %s", columnName, schema);
Preconditions.checkArgument(
field.type().isPrimitiveType(),
"Can't compute stats on non-primitive type column: %s (%s)",
columnName,
field.type());
}
}

private String appIdentifier() {
String icebergVersion = IcebergBuild.fullVersion();
String sparkVersion = spark().version();
return String.format("Iceberg %s Spark %s", icebergVersion, sparkVersion);
}

private long snapshotId() {
return snapshot.snapshotId();
}

private String jobDesc() {
return String.format(
"Computing table stats for %s (snapshot_id=%s, columns=%s)",
table.name(), snapshotId(), columns());
}

private String outputPath() {
TableOperations operations = ((HasTableOperations) table).operations();
String fileName = String.format("%s-%s.stats", snapshotId(), UUID.randomUUID());
return operations.metadataFileLocation(fileName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.actions;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.CompactSketch;
import org.apache.datasketches.theta.Sketch;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.PuffinCompressionCodec;
import org.apache.iceberg.puffin.StandardBlobTypes;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.stats.ThetaSketchAgg;

public class NDVSketchUtil {

private NDVSketchUtil() {}

public static final String APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv";

static List<Blob> generateBlobs(
SparkSession spark, Table table, Snapshot snapshot, List<String> columns) {
Row sketches = computeNDVSketches(spark, table, snapshot, columns);
Schema schema = table.schemas().get(snapshot.schemaId());
List<Blob> blobs = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
Types.NestedField field = schema.findField(columns.get(i));
Sketch sketch = CompactSketch.wrap(Memory.wrap((byte[]) sketches.get(i)));
blobs.add(toBlob(field, sketch, snapshot));
}
return blobs;
}

private static Blob toBlob(Types.NestedField field, Sketch sketch, Snapshot snapshot) {
return new Blob(
StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1,
ImmutableList.of(field.fieldId()),
snapshot.snapshotId(),
snapshot.sequenceNumber(),
ByteBuffer.wrap(sketch.toByteArray()),
PuffinCompressionCodec.ZSTD,
ImmutableMap.of(
APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY,
String.valueOf((long) sketch.getEstimate())));
}

private static Row computeNDVSketches(
SparkSession spark, Table table, Snapshot snapshot, List<String> colNames) {
return spark
.read()
.format("iceberg")
.option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId())
.load(table.name())
.select(toAggColumns(colNames))
.first();
}

private static Column[] toAggColumns(List<String> colNames) {
return colNames.stream().map(NDVSketchUtil::toAggColumn).toArray(Column[]::new);
}

private static Column toAggColumn(String colName) {
ThetaSketchAgg agg = new ThetaSketchAgg(colName);
return new Column(agg.toAggregateExpression());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ComputeTableStats;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -96,4 +97,9 @@ public DeleteReachableFilesSparkAction deleteReachableFiles(String metadataLocat
public RewritePositionDeleteFilesSparkAction rewritePositionDeletes(Table table) {
return new RewritePositionDeleteFilesSparkAction(spark, table);
}

@Override
public ComputeTableStats computeTableStats(Table table) {
return new ComputeTableStatsSparkAction(spark, table);
}
}
Loading

0 comments on commit 5582b0c

Please sign in to comment.