Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

/**
* Represents a partition statistics file that can be used to read table data more efficiently.
*
* <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics
* support is not required to read the table correctly.
*/
public interface PartitionStatisticsFile {
/** ID of the Iceberg table's snapshot the partition statistics file is associated with. */
long snapshotId();

/** Returns fully qualified path to the file. Never null. */
String path();

/** Returns the size of the partition statistics file in bytes. */
long fileSizeInBytes();
}
17 changes: 17 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

/** Represents a table. */
public interface Table {
Expand Down Expand Up @@ -286,6 +287,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table and commit.
*
Expand Down Expand Up @@ -327,6 +339,11 @@ default UpdateStatistics updateStatistics() {
*/
List<StatisticsFile> statisticsFiles();

/** Returns the current partition statistics files for the table. */
default List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

/**
* Returns the current refs for the table
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;

/** API for updating partition statistics files in a table. */
public interface UpdatePartitionStatistics extends PendingUpdate<List<PartitionStatisticsFile>> {
/**
* Set the table's partition statistics file for given snapshot, replacing the previous partition
* statistics file for the snapshot if any exists.
*
* @return this for method chaining
*/
UpdatePartitionStatistics setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile);

/**
* Remove the table's partition statistics file for given snapshot.
*
* @return this for method chaining
*/
UpdatePartitionStatistics removePartitionStatistics(long snapshotId);
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ public List<StatisticsFile> statisticsFiles() {
return ImmutableList.of();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

@Override
public Map<String, SnapshotRef> refs() {
return table().refs();
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public UpdateStatistics updateStatistics() {
"Cannot update statistics of a " + descriptor + " table");
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Cannot update partition statistics of a " + descriptor + " table");
}

@Override
public ExpireSnapshots expireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ public UpdateStatistics updateStatistics() {
return new SetStatistics(ops);
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return new SetPartitionStatistics(ops);
}

@Override
public ExpireSnapshots expireSnapshots() {
return new RemoveSnapshots(ops);
Expand Down Expand Up @@ -255,6 +260,11 @@ public List<StatisticsFile> statisticsFiles() {
return ops.current().statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ops.current().partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return ops.current().refs();
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ public UpdateStatistics updateStatistics() {
return updateStatistics;
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
checkLastOperationCommitted("UpdatePartitionStatistics");
UpdatePartitionStatistics updatePartitionStatistics =
new SetPartitionStatistics(transactionOps);
updates.add(updatePartitionStatistics);
return updatePartitionStatistics;
}

@Override
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
Expand Down Expand Up @@ -733,6 +742,11 @@ public UpdateStatistics updateStatistics() {
return BaseTransaction.this.updateStatistics();
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return BaseTransaction.this.updatePartitionStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return BaseTransaction.this.expireSnapshots();
Expand Down Expand Up @@ -769,6 +783,11 @@ public List<StatisticsFile> statisticsFiles() {
return current.statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return current.partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return current.refs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public UpdateStatistics updateStatistics() {
return wrapped.updateStatistics();
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return wrapped.updatePartitionStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return wrapped.expireSnapshots();
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -86,6 +85,11 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
.run(deleteFunc::accept);
}

protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) {
return !tableMetadata.statisticsFiles().isEmpty()
|| !tableMetadata.partitionStatisticsFiles().isEmpty();
}

protected Set<String> expiredStatisticsFilesLocations(
TableMetadata beforeExpiration, TableMetadata afterExpiration) {
Set<String> statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration);
Expand All @@ -98,10 +102,15 @@ private Set<String> statsFileLocations(TableMetadata tableMetadata) {
Set<String> statsFileLocations = Sets.newHashSet();

if (tableMetadata.statisticsFiles() != null) {
statsFileLocations =
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.forEach(statsFileLocations::add);
}

if (tableMetadata.partitionStatisticsFiles() != null) {
tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.forEach(statsFileLocations::add);
}

return statsFileLocations;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.immutables.value.Value;

@Value.Immutable
public interface GenericPartitionStatisticsFile extends PartitionStatisticsFile {}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()) {
if (hasAnyStatisticsFiles(beforeExpiration)) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,44 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class SetPartitionStatistics implements MetadataUpdate {
private final PartitionStatisticsFile partitionStatisticsFile;

public SetPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) {
this.partitionStatisticsFile = partitionStatisticsFile;
}

public long snapshotId() {
return partitionStatisticsFile.snapshotId();
}

public PartitionStatisticsFile partitionStatisticsFile() {
return partitionStatisticsFile;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setPartitionStatistics(partitionStatisticsFile);
}
}

class RemovePartitionStatistics implements MetadataUpdate {
private final long snapshotId;

public RemovePartitionStatistics(long snapshotId) {
this.snapshotId = snapshotId;
}

public long snapshotId() {
return snapshotId;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removePartitionStatistics(snapshotId);
}
}

class AddSnapshot implements MetadataUpdate {
private final Snapshot snapshot;

Expand Down
Loading