Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,12 @@ acceptedBreaks:
- code: "java.class.removed"
old: "interface org.apache.iceberg.view.SQLViewRepresentation"
justification: "Moving from iceberg-api to iceberg-core"
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.Snapshot::partitionStatsFileLocation()"
justification: "Track partition stats from Snapshot"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.PartitionStatisticsFile> org.apache.iceberg.Table::partitionStatisticsFiles()"
justification: "Track partition stats from TableMetadata"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.catalog.Namespace org.apache.iceberg.view.ViewVersion::defaultNamespace()"
justification: "Acceptable break due to updating View APIs and the View Spec"
Expand Down
1 change: 0 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ __Modules__
`iceberg-core`
`iceberg-data`
`iceberg-orc`
`iceberg-parquet`

Changes to public interfaces and classes in the subprojects listed above require a deprecation cycle of one minor
release. These projects contain common and internal code used by other projects and can evolve within a major release.
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Iceberg table support is organized in library modules:
* `iceberg-common` contains utility classes used in other modules
* `iceberg-api` contains the public Iceberg API
* `iceberg-core` contains implementations of the Iceberg API and support for Avro data files, **this is what processing engines should depend on**
* `iceberg-parquet` is an optional module for working with tables backed by Parquet files
* `iceberg-arrow` is an optional module for reading Parquet into Arrow memory
* `iceberg-orc` is an optional module for working with tables backed by ORC files
* `iceberg-hive-metastore` is an implementation of Iceberg tables backed by the Hive metastore Thrift client
Expand Down
44 changes: 44 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.Serializable;

/**
* Represents a partition statistics file in the table default format, that can be used to read
* table data more efficiently.
*
* <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics
* support is not required to read the table correctly.
*/
public interface PartitionStatisticsFile extends Serializable {
/** ID of the Iceberg table's snapshot the partition statistics file is associated with. */
long snapshotId();

/**
* Returns fully qualified path to the file, suitable for constructing a Hadoop Path. Never null.
*/
String path();

/**
* Maximum data sequence number of the Iceberg table's snapshot the partition statistics was
* computed from.
*/
long maxDataSequenceNumber();
}
6 changes: 6 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,10 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

/**
* Returns the partition stats file location from snapshot summary, or null if this information is
* not available.
*/
String partitionStatsFileLocation();
}
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table and commit.
*
Expand Down Expand Up @@ -326,6 +337,9 @@ default UpdateStatistics updateStatistics() {
*/
List<StatisticsFile> statisticsFiles();

/** Returns the current partition statistics files for the table. */
List<PartitionStatisticsFile> partitionStatisticsFiles();

/**
* Returns the current refs for the table
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;

/** API for updating partition statistics files in a table. */
public interface UpdatePartitionStatistics extends PendingUpdate<List<PartitionStatisticsFile>> {
/**
* Set the table's partition statistics file for given snapshot, replacing the previous partition
* statistics file for the snapshot if any exists.
*
* @return this for method chaining
*/
UpdatePartitionStatistics setPartitionStatistics(
long snapshotId, PartitionStatisticsFile partitionStatisticsFile);

/**
* Remove the table's partition statistics file for given snapshot.
*
* @return this for method chaining
*/
UpdatePartitionStatistics removePartitionStatistics(long snapshotId);
}
13 changes: 13 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.objenesis.strategy.StdInstantiatorStrategy;

Expand Down Expand Up @@ -265,6 +266,18 @@ public static void serialize(final Serializable obj, final OutputStream outputSt
}
}

public static void assertThrows(
String message,
Class<? extends Exception> expected,
String containedInMessage,
Runnable runnable) {
AbstractThrowableAssert<?, ? extends Throwable> check =
Assertions.assertThatThrownBy(runnable::run).as(message).isInstanceOf(expected);
if (null != containedInMessage) {
check.hasMessageContaining(containedInMessage);
}
}

public static class KryoHelpers {
private KryoHelpers() {}

Expand Down
40 changes: 9 additions & 31 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ if (file("${rootDir}/iceberg-build.properties").exists()) {
}

def projectVersion = getProjectVersion()
final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-parquet", "iceberg-orc", "iceberg-common", "iceberg-data"]
final REVAPI_PROJECTS = ["iceberg-api", "iceberg-core", "iceberg-orc", "iceberg-common", "iceberg-data"]

allprojects {
group = "org.apache.iceberg"
Expand Down Expand Up @@ -349,6 +349,13 @@ project(':iceberg-core') {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}

implementation(libs.parquet.avro) {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
exclude group: 'it.unimi.dsi'
exclude group: 'org.codehaus.jackson'
}

testImplementation libs.jetty.servlet
testImplementation libs.jetty.server
testImplementation libs.mockserver.netty
Expand All @@ -366,7 +373,6 @@ project(':iceberg-data') {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-core')
compileOnly project(':iceberg-parquet')
compileOnly project(':iceberg-orc')
compileOnly(libs.hadoop2.common) {
exclude group: 'commons-beanutils'
Expand Down Expand Up @@ -555,7 +561,6 @@ project(':iceberg-delta-lake') {
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')
implementation platform(libs.jackson.bom)
implementation "com.fasterxml.jackson.core:jackson-databind"
annotationProcessor libs.immutables.value
Expand Down Expand Up @@ -761,32 +766,6 @@ project(':iceberg-orc') {
}
}

project(':iceberg-parquet') {
test {
useJUnitPlatform()
}
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-common')

implementation(libs.parquet.avro) {
exclude group: 'org.apache.avro', module: 'avro'
// already shaded by Parquet
exclude group: 'it.unimi.dsi'
exclude group: 'org.codehaus.jackson'
}

compileOnly libs.avro.avro
compileOnly(libs.hadoop2.client) {
exclude group: 'org.apache.avro', module: 'avro'
}

testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
}
}

project(':iceberg-arrow') {
test {
useJUnitPlatform()
Expand All @@ -795,7 +774,6 @@ project(':iceberg-arrow') {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')

implementation(libs.arrow.vector) {
exclude group: 'io.netty', module: 'netty-buffer'
Expand Down Expand Up @@ -837,7 +815,6 @@ project(':iceberg-pig') {
api project(':iceberg-api')
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-parquet')

implementation(libs.parquet.avro) {
exclude group: 'org.apache.avro', module: 'avro'
Expand Down Expand Up @@ -905,6 +882,7 @@ project(':iceberg-nessie') {

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation libs.hadoop2.mapreduce.client.core

// Only there to prevent "warning: unknown enum constant SchemaType.OBJECT" compile messages
testCompileOnly libs.microprofile.openapi.api
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ public List<StatisticsFile> statisticsFiles() {
return ImmutableList.of();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

@Override
public Map<String, SnapshotRef> refs() {
return table().refs();
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public UpdateStatistics updateStatistics() {
"Cannot update statistics of a " + descriptor + " table");
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Cannot update partition statistics of a " + descriptor + " table");
}

@Override
public ExpireSnapshots expireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -36,6 +37,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -202,6 +204,19 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
return apply;
}

@Override
protected String writeUpdatedPartitionStats(long snapshotCreatedTimeInMillis) {
OutputFile outputFile = newPartitionStatsFile();
// get entries from base snapshot and use it for current snapshot.
try (CloseableIterable<Partition> recordIterator = partitionStatsEntriesFromParentSnapshot()) {
writePartitionStatsEntries(recordIterator, outputFile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return outputFile.location();
}

private boolean requiresRewrite(Set<ManifestFile> currentManifests) {
if (clusterByFunc == null) {
// manifests are deleted and added directly so don't perform a rewrite
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ private void cacheDataFileChanges(FileIO fileIO) {
this.removedDataFiles = deletes.build();
}

@Override
public String partitionStatsFileLocation() {
return summary.get(SnapshotSummary.PARTITION_STATS_FILE_LOCATION);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ public UpdateStatistics updateStatistics() {
return new SetStatistics(ops);
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return new SetPartitionStatistics(ops);
}

@Override
public ExpireSnapshots expireSnapshots() {
return new RemoveSnapshots(ops);
Expand Down Expand Up @@ -250,6 +255,11 @@ public List<StatisticsFile> statisticsFiles() {
return ops.current().statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ops.current().partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return ops.current().refs();
Expand Down
Loading