Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;

/**
* API for removing partition specs from the metadata which are not the default spec and no longer refer to any
* datafiles in the table.
* <p>
* When committing, these changes will be applied to the latest table metadata. Commit conflicts
* will be resolved by recalculating which specs are no longer in use again in the latest metadata and
* retrying.
* <p>
* {@link #apply()} returns the specs that will remain if committed on the current metadata
*/
public interface RemoveUnusedSpecs extends PendingUpdate<List<PartitionSpec>> {

}
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ default String name() {
*/
AppendFiles newAppend();

/**
* Remove any partition specs from the Metadata that are no longer used in any data files. Always preserves
* the current default spec even if it has not yet been used.
* @return a new {@link RemoveUnusedSpecs}
*/
RemoveUnusedSpecs removeUnusedSpecs();

/**
* Create a new {@link AppendFiles append API} to add files to this table and commit.
* <p>
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public UpdateProperties updateProperties() {
throw new UnsupportedOperationException("Cannot update the properties of a metadata table");
}

@Override
public RemoveUnusedSpecs removeUnusedSpecs() {
throw new UnsupportedOperationException("Cannot remove partition specs of a metadata table");
}

@Override
public ReplaceSortOrder replaceSortOrder() {
throw new UnsupportedOperationException("Cannot update the sort order of a metadata table");
Expand Down
93 changes: 93 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;

import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

public class BaseRemoveUnusedSpecs implements RemoveUnusedSpecs {
private final TableOperations ops;
private final Table table;

public BaseRemoveUnusedSpecs(TableOperations ops, Table table) {
this.ops = ops;
this.table = table;
}

@Override
public List<PartitionSpec> apply() {
TableMetadata current = ops.refresh();
TableMetadata newMetadata = removeUnusedSpecs(current);
return newMetadata.specs();
}

@Override
public void commit() {
TableMetadata base = ops.refresh();
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(taskOps -> {
TableMetadata current = ops.refresh();
TableMetadata newMetadata = removeUnusedSpecs(current);
taskOps.commit(current, newMetadata);
});
}

private TableMetadata removeUnusedSpecs(TableMetadata current) {
List<PartitionSpec> specs = current.specs();
int currentSpecId = current.defaultSpecId();

// Read ManifestLists and get all specId's in use
Set<Integer> specsInUse =
Sets.newHashSet(
CloseableIterable.transform(
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.ALL_ENTRIES)
.newScan()
.planFiles(),
fileScanTask -> ((ManifestEntriesTable.ManifestReadTask) (fileScanTask)).partitionSpecId()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the reason I use this api, and not actually accessing the underlying rows of ALL_MANIFESTS table is that for tables which have corrupt specs (from previous bugs) will be unable to actually generate the common "partition" schema which is required to actually manifest rows. This table instead just gets us ManifestFiles which we can check the specID of directly.

));

List<PartitionSpec> remainingSpecs = specs.stream()
.filter(spec -> spec.specId() == currentSpecId || specsInUse.contains(spec.specId()))
.collect(Collectors.toList());

return current.withSpecs(remainingSpecs);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public ReplaceSortOrder replaceSortOrder() {
return new BaseReplaceSortOrder(ops);
}

@Override
public RemoveUnusedSpecs removeUnusedSpecs() {
return new BaseRemoveUnusedSpecs(ops, this);
}

@Override
public UpdateLocation updateLocation() {
return new SetLocation(ops);
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,11 @@ public UpdateProperties updateProperties() {
return BaseTransaction.this.updateProperties();
}

@Override
public RemoveUnusedSpecs removeUnusedSpecs() {
throw new UnsupportedOperationException("Cannot remove unused partition specs as part of a transaction");
}

@Override
public ReplaceSortOrder replaceSortOrder() {
return BaseTransaction.this.replaceSortOrder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,9 @@ public CloseableIterable<StructLike> rows() {
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}

int partitionSpecId() {
return manifest.partitionSpecId();
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ public AppendFiles newAppend() {
throw new UnsupportedOperationException(errorMsg("newAppend"));
}

@Override
public RemoveUnusedSpecs removeUnusedSpecs() {
throw new UnsupportedOperationException(errorMsg("removeUnusedSpecs"));
}

@Override
public RewriteFiles newRewrite() {
throw new UnsupportedOperationException(errorMsg("newRewrite"));
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,17 @@ public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

/**
* Only use when it has been externally validated that the new specs completely cover all currently referencable
* data files.
*/
TableMetadata withSpecs(List<PartitionSpec> newSpecs) {
return new TableMetadata(null, formatVersion, uuid, location,
lastSequenceNumber, System.currentTimeMillis(), lastColumnId, currentSchemaId, schemas, defaultSpecId, newSpecs,
lastAssignedPartitionId, defaultSortOrderId, sortOrders, properties, currentSnapshotId,
snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}

private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(partitionSpec.schema())
.withSpecId(partitionSpec.specId());
Expand Down
116 changes: 116 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveUnusedSpecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestRemoveUnusedSpecs extends TableTestBase {

@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}

public TestRemoveUnusedSpecs(int formatVersion) {
super(formatVersion);
}

@Test
public void testRemoveAllButCurrent() {
table.updateSchema()
.addColumn("ts", Types.TimestampType.withoutZone())
.addColumn("category", Types.StringType.get())
.commit();
table.updateSpec().addField("id").commit();
table.updateSpec().addField("ts").commit();
table.updateSpec().addField("category").commit();
table.updateSpec().addField("data").commit();
Assert.assertEquals(5, table.specs().size());

PartitionSpec currentSpec = table.spec();
table.removeUnusedSpecs().commit();

Assert.assertEquals(1, table.specs().size());
Assert.assertEquals(currentSpec, table.spec());
}

@Test
public void testDontRemoveInUseSpecsV2() {
Assume.assumeTrue("V2", formatVersion == 2);

table.updateSchema()
.addColumn("ts", Types.LongType.get())
.addColumn("category", Types.StringType.get())
.commit();

table.updateSpec().addField("id").commit(); // 1
table.newAppend().appendFile(newDataFile("data_bucket=0/id=5")).commit();

table.updateSpec().addField("ts").commit(); // 2

table.updateSpec().addField("category").commit(); // 3
table.newRowDelta()
.addDeletes(newDeleteFile(table.spec().specId(), "data_bucket=0/id=5/ts=100/category=fo"))
.commit();

table.updateSpec().addField("data").commit(); // 4
Assert.assertEquals(5, table.specs().size());

PartitionSpec currentSpec = table.spec();
table.removeUnusedSpecs().commit();

Assert.assertEquals("Missing required spec", ImmutableSet.of(1, 3, 4), table.specs().keySet());
Assert.assertEquals(currentSpec, table.spec());
}

@Test
public void testDontRemoveInUseSpecs() {
Assume.assumeTrue("V2", formatVersion == 2);

table.updateSchema()
.addColumn("ts", Types.LongType.get())
.addColumn("category", Types.StringType.get())
.commit();

table.updateSpec().addField("id").commit(); // 1
table.newAppend().appendFile(newDataFile("data_bucket=0/id=5")).commit();

table.updateSpec().addField("ts").commit(); // 2

table.updateSpec().addField("category").commit(); // 3
table.newAppend().appendFile(newDataFile("data_bucket=0/id=5/ts=100/category=fo")).commit();

table.updateSpec().addField("data").commit(); // 4
Assert.assertEquals(5, table.specs().size());

PartitionSpec currentSpec = table.spec();
table.removeUnusedSpecs().commit();

Assert.assertEquals("Missing required spec", ImmutableSet.of(1, 3, 4), table.specs().keySet());
Assert.assertEquals(currentSpec, table.spec());
}
}