Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ default ComputeTableStats computeTableStats(Table table) {
this.getClass().getName() + " does not implement computeTableStats");
}

/** Instantiates an action to compute partition stats. */
default ComputePartitionStats computePartitionStats(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement computePartitionStats");
}

/** Instantiates an action to rewrite all absolute paths in table metadata. */
default RewriteTablePath rewriteTablePath(Table table) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import org.apache.iceberg.PartitionStatisticsFile;

/**
* An action that computes and writes the partition statistics of an Iceberg table. Current snapshot
* is used by default.
*/
public interface ComputePartitionStats
extends Action<ComputePartitionStats, ComputePartitionStats.Result> {
Comment on lines +27 to +28
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm late to this PR so my apologies if this has already been discussed in the past but have we considered just including the ability for computing partition stats to the existing ComputeTableStats Action? I'm a bit wary of adding too many actions, especially in this case where an external user who is using spark has to know that there are 2 separate actions for "stats" in general.

In my head something like "compute table stats" + a partition stats option API on that seems better.

I know the actual stats files are of course different but I'm mainly thinking from an action and API perspective, not exposing too many different concepts. The action implementation can hide the complexity of the "how" for both types of stats. The result type of the existing procedure would be both types of files. It may not be a big deal for sophisticated users who know when to use which but I think that many people will just want to generate stats and then the sophisticated users would drill down which ones they want and when to run it etc.

Of course if we do try to add flags to the existing procedure we have to think through what's the default behavior when not specified (preserving compatibility) and then what additional modes we have.

Thoughts @ajantha-bhat @karuppayya @nastra @aokolnychyi @rdblue ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did analyze it while adding this spark action.

First thing is with the naming ComputeTableStatsProcedure, clearly says it is a table level statistics. Plugging in the partition level statistics didn't seem logical there. Also it takes column arguments to specify which columns need table level stats, which is currently not applicable for partition stats.

Partition stats supports incremental compute and may need an option like force refresh in the call procedure. So, the syntax and input arguments (one needs column names and other needs boolean) differ with respect to the functionality.

Lastly, like you said, they are independent stats. Having an action to independently compute them is better I feel.
Spark CBO is only integrated with table level stats, don't have an interface to use partition level stats. This action is mainly for the use cases mentioned in the PR description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amogh-jahagirdar for bringing this up.
Though we don't need to follow other engines, i can see examples where ANALYZE TABLE is used for both table level and partition level stats.

  1. Hive
  2. Spark
  3. Trino

I think it would be good to follow a similar pattern for Iceberg actions and procedure, so that users don't have to think about gathering iceberg stats differently as against other engines.
But like highlighted, this comes with a complexity of adding conditional code.

@ajantha-bhat , i think the force refresh is something that would be useful for current table stats action as well.
Also, eventually we would need to support incremental stats with the current table stats as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya, If you see Spark, Hive , Trino
They have unified stats in one compute.

At Iceberg, Table statistics computes just NDV. No column level stats aggregate, need to read manifests for that. Also, partition stats should also include column level stats, so later we can aggregate it into table level stats. @deniskuzZ has a proposal for that.

Currently, table stats is not complete and partition stats is also very different. So, I prefer keeping it separately.
In 2.0, if we unify them. We can have a single procedure and deprecate these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if completeness of functionality should determine this.
I am mainly coming from the end user perspective of having to use different action for stats whereas it is different in most other engine/formats.
But I dont have a strong opinion on this, I am fine either ways.
Would be good to get @aokolnychyi 's thought here since it was raised by him earlier

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajantha-bhat

Did we think about what happens if one of the compute failed? If table stats compute failed but partition stats compute has succeeded and vice versa.

Yeah my take is that stats computation should be best effort; so let's say someone configures both table and partition stats to run, and whichever one that runs first fails we just move on to the next. Of course at the end if there's any kind of failure we should clearly indicate that to the user, but I think the partial failure case you're referencing can be handled by just treating all stats generation as best effort in the implementation.

We have to add lot of validations like what of user configures columns and calls withOnlyPartitionStats

I guess I'm not understanding why the only additional validation would not just be Preconditions.checkArgument(columns.isEmpty()) in the withOnlyPartitionStats? Are there any validations beyond this?

@RussellSpitzer
My breakdown of the pros/cons, @ajantha-bhat feel free to add stuff in case I'm missing something

Combined action with different options:

Pros:

  1. I think this better aligns with what exists in other engines like
    Hive
    Spark
    Trino
    where column stats and partition stats are effectively collected through the same SQL.

I don't really see a distinction here between the procedure/action. I feel like we should generally try
and keep the action API surface area reduced, just as we do with the procedure, it's just that the procedure is SQL oriented instead of programatic.

  1. I think In the long run many of the options that apply for partition stats will also apply for normal table stats. e.g. I think we will also probably want a force refresh; we will also probably want incrementally computing. So if we know much of the same options will apply at some point it seems better to just coalesce the API here.

Cons:

  1. There's a bit of complexity for validation exposing multiple options on the API that won't work together (like partitionStatsOnly and specifying columns). I'm doubtful how complex this really is, this just seems like a simple preconditions check but I could be missing something.

Separate Actions:

Pros:
The main argument I can think for separate procedures is for people familiar with the details of the statistics and their granularities, then having separate procedures is clearer since at least as of today they are different.

Cons:
The main arguments that I have against separate procedures is that if we consider that in the long run the options will converge it seems compelling to keep it combined (just going back to my feeling to keep as minimal of an API surface area as possible). I think it'd be better to do this to begin with rather than separate and then undo it later.

In the end, if folks feel strongly about keeping it separate I think I'm OK, I just want to make sure we're being thoughtful about the implications of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the argument here I feel strongest about is the mixing of options that are invalid that we can only determine that at runtime. IMHO, we should aim to have apis where there as few gotchas as possible. If the API can't hide the fact that there are two very different things happening under the hood I don't think they should be combined.

So I think we could keep them in the same action but only if we have the apis a little better segregated.

Perhaps instead of "columns" you have something like ".withTableLevelNDVs("columns").withPartitionStats()"

These are breaking changes I know, but they make clear that there are two different mutually exclusive things happening.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer.

I don't want to introduce breaking changes right now. Plus partition stats can evolve. It can even have column level stats per partition in the future. So, once everything is finalized. We can have a unified API if required in the future and deprecate the existing ones. Until now, I think good to keep separate APIs.

I agree on

If the API can't hide the fact that there are two very different things happening under the hood I don't think they should be combined.

@amogh-jahagirdar: Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the API can't hide the fact that there are two very different things happening under the hood I don't think they should be combined.

I definitely agree with this statement but I do think it should be quite possible to hide the fact that there are 2 different things happening under the hood.

I think I'm generally OK (+0.5) to move ahead as it is laid out here and I also want to avoid breaking changes, @RussellSpitzer @ajantha-bhat so if we want to move forward with this PR feel free!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amogh-jahagirdar and @RussellSpitzer for the discussion and helping to conclude on this.
I do understand that we have a chance to unify this in future. Once partition stats are matured and we need to provide new unified action (to avoid breaking changes).

For now, Please approve the PR if you are ok, We will merge it today. I just rebased it.

/**
* Choose the table snapshot to compute partition stats.
*
* @param snapshotId long ID of the snapshot for which stats need to be computed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
* @param snapshotId long ID of the snapshot for which stats need to be computed
* @param snapshotId long ID of the snapshot for which stats need to be computed, , by default the current snapshot is used.

And Remove from the class javadoc, that way will be in sync with org.apache.iceberg.actions.ComputeTableStats#snapshot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was like that before. But I got an explicit comment from @nastra to move that away from snapshot api.

more info: #12450 (comment)

* @return this for method chaining
*/
ComputePartitionStats snapshot(long snapshotId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an api to force the computation, in cases when existing one is corrupted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user can unregister the stats from the table using table.updatePartitionStatistics().removePartitionStatistics(snapshotId).commit();, so it can force recompute when the compute API is called again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an end user invoking a SQL procedure, i would expect this to be available in the procedure, instead of having to use the Java API to achieve it.
We could do the same (remove and compute) in the Spark Action when forced(and eventually add to procedure)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark action is similar to API, so user can do by themself (unregister and call again).

For, call procedure I will add an option to force refresh.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the action is the delegate for the Procedure. Procedure does input validation and result(from action's execution) conversion to internal row.
The business logic should remain in action. In procedure, we would set the boolean to force refresh on the action, and action decides how it will use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the action is the delegate for the Procedure

I don't think it is a strict guideline. Procedures are just a way to provide functionality from SQL. It can combine one or more APIs.

Added the force compute. I agree that SQL user need a way to clear stats for force compute incase of corruption (even though it is very rare) without depending on APIs.

I was neutral about adding it to the spark action as users of spark action already have API access. But I want to move forward with this work. So, I have added it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to go back and forth on this.
Since the probability of statistics file going corrupt is low, and handling in the action introduces additional complexity in making more commits, and also having a workaround in removing the stats file using table api,
feel free to not add this to this change


/** The result of partition statistics collection. */
interface Result {

/** Returns statistics file or null if no statistics were collected. */
PartitionStatisticsFile statisticsFile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long
stats =
computeStats(table, snapshot.allManifests(table.io()), false /* incremental */).values();
} else {
if (statisticsFile.snapshotId() == snapshotId) {
// no-op
LOG.info("Returning existing statistics file for snapshot {}", snapshotId);
return statisticsFile;
}

try {
stats = computeAndMergeStatsIncremental(table, snapshot, partitionType, statisticsFile);
} catch (InvalidStatsFileException exception) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import javax.annotation.Nullable;
import org.apache.iceberg.PartitionStatisticsFile;
import org.immutables.value.Value;

@Value.Enclosing
@SuppressWarnings("ImmutablesStyle")
@Value.Style(
typeImmutableEnclosing = "ImmutableComputePartitionStats",
visibilityString = "PUBLIC",
builderVisibilityString = "PUBLIC")
interface BaseComputePartitionStats extends ComputePartitionStats {

@Value.Immutable
interface Result extends ComputePartitionStats.Result {
@Override
@Nullable
PartitionStatisticsFile statisticsFile();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.actions;

import java.io.IOException;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStatsHandler;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ComputePartitionStats;
import org.apache.iceberg.actions.ImmutableComputePartitionStats;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Computes the stats incrementally after the snapshot that has partition stats file till the given
* snapshot (uses current snapshot if not specified) and writes the combined result into a {@link
* PartitionStatisticsFile} after merging the stats for a given snapshot. Does a full compute if
* previous statistics file does not exist. Also registers the {@link PartitionStatisticsFile} to
* table metadata.
*/
public class ComputePartitionStatsSparkAction
extends BaseSparkAction<ComputePartitionStatsSparkAction> implements ComputePartitionStats {

private static final Logger LOG = LoggerFactory.getLogger(ComputePartitionStatsSparkAction.class);
private static final Result EMPTY_RESULT =
ImmutableComputePartitionStats.Result.builder().build();

private final Table table;
private Snapshot snapshot;

ComputePartitionStatsSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.snapshot = table.currentSnapshot();
}

@Override
protected ComputePartitionStatsSparkAction self() {
return this;
}

@Override
public ComputePartitionStats snapshot(long newSnapshotId) {
Snapshot newSnapshot = table.snapshot(newSnapshotId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before fetching the table metadata, we can check whether the new snapshot ID is equal to the initial snapshot ID

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate? what do you mean by initial snapshot id ?

User can pass any snapshot id and stats will be computed and committed for that particular snapshot. If the user has not set the snapshot, it uses table.currentSnapshot(); from the constructor.

This design is same as existing ComputeTableStatsSparkAction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'initial snapshot' I referred to is obtained when constructing the ComputePartitionStatsSparkAction class:

this.snapshot = table.currentSnapshot();

When the newly passed snapshot is identical to this initial one, we can avoid fetching the Iceberg metadata file - a small optimization :)

Preconditions.checkArgument(newSnapshot != null, "Snapshot not found: %s", newSnapshotId);
this.snapshot = newSnapshot;
return this;
}

@Override
public Result execute() {
if (snapshot == null) {
LOG.info("No snapshot to compute partition stats for table {}", table.name());
return EMPTY_RESULT;
}

JobGroupInfo info = newJobGroupInfo("COMPUTE-PARTITION-STATS", jobDesc());
return withJobGroupInfo(info, this::doExecute);
}

private Result doExecute() {
LOG.info("Computing partition stats for {} (snapshot {})", table.name(), snapshot.snapshotId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
LOG.info("Computing partition stats for {} (snapshot {})", table.name(), snapshot.snapshotId());
LOG.info("Computing partition stats for {} (snapshot={})", table.name(), snapshot.snapshotId());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer without = and we can probably get away from the nested parentheses in a Log statement.

"Computing partition stats for table {} snapshot {}" is sufficient info imo

PartitionStatisticsFile statisticsFile;
try {
statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table, snapshot.snapshotId());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the local algorithm instead of distributed algorithm as we did a POC and JMH benchmarks and found out that serialization cost high for distributive approach. Hence, went with local algorithm.

more info:
#9437 (comment)

and #9437 (comment)

} catch (IOException e) {
throw new RuntimeIOException(e);
}

if (statisticsFile == null) {
return EMPTY_RESULT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when you truncate a table, do we return a empty stats file or null? Also add a test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean truncate a whole table? Is there a quick Spark API to do that? Since, Iceberg uses DELETE FROM, actual files are not deleted during truncate. Stats will still be computed with delete counters. It will be similar to existing delete tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens after a major compaction?
Is a empty stats file generated or no file is created? Would be good to call this out

Copy link
Member Author

@ajantha-bhat ajantha-bhat May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are testcase at API level for compaction. Didn't add the same test case here as these are just wrapper and core functionality already tests this.

Compaction, writes new stats and data file count changes per partition after compaction.

Rewrite manifest also has test covered.

check the core API PR for test coverage: #12629

}

table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the statistics for a particular snapshot were already computed, and user triggered the action second time what is the behavior?

  1. Does the action recompute? Or does it return the existing statistics file
  2. If it returned the existing file from first computation, how does the user know if the stats in result is from a prior computation. Do we need a field in result to denote this, like CREATED (when computed fresh), EXISTING (when it returns an already computed one)?
  3. Related to 2, if we are returning an existing stats file, do we need a way to force the computation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the statistics for a particular snapshot were already computed, and user triggered the action second time what is the behavior?

Currently, it reads the stats file (of the same snapshot) and write it into a new stats file without any additional compute of reading manifests. I will change it to no-op by returning the same file along with adding Info logs that stats file exist for snapshot.

do we need a way to force the computation

The user can unregister the stats from the table using table.updatePartitionStatistics().removePartitionStatistics(snapshotId).commit();, so it can force recompute when the compute API is called again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it to no-op

+1
I will let you take a call on whether we need to return whether it was created or an existing file or do it in a subsequent PR .

For a sql user, there is no way to figure why the command completes faster(when returning existing) vs taking time(when computed new) without a status.

Although this might not be an issue using APIs, since the user can check whether stats file is already available using table api (org.apache.iceberg.Table#partitionStatisticsFiles).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a sql user, there is no way to figure why the command completes faster(when returning existing) vs taking time(when computed new) without a status.

Maybe we need to provide a metadata table to list the statistics file (both table and partition) in future.

we need to return whether it was created or an existing file

I think adding status is an overkill. They think about freshness etc. I will make it no op and print in the logs about why it is no op.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added no op and logs + testcase.

return ImmutableComputePartitionStats.Result.builder().statisticsFile(statisticsFile).build();
}

private String jobDesc() {
return String.format(
"Computing partition stats for %s (snapshot=%s)", table.name(), snapshot.snapshotId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ComputePartitionStats;
import org.apache.iceberg.actions.ComputeTableStats;
import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
import org.apache.iceberg.spark.Spark3Util;
Expand Down Expand Up @@ -104,6 +105,11 @@ public ComputeTableStats computeTableStats(Table table) {
return new ComputeTableStatsSparkAction(spark, table);
}

@Override
public ComputePartitionStats computePartitionStats(Table table) {
return new ComputePartitionStatsSparkAction(spark, table);
}

@Override
public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
return new RemoveDanglingDeletesSparkAction(spark, table);
Expand Down
Loading