Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemoveSortOrders implements MetadataUpdate {
private final Set<Integer> sortOrderIds;

public RemoveSortOrders(Set<Integer> sortOrderIds) {
this.sortOrderIds = sortOrderIds;
}

public Set<Integer> sortOrderIds() {
return sortOrderIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeSortOrders(sortOrderIds);
}
}

class RemoveSchemas implements MetadataUpdate {
private final Set<Integer> schemaIds;

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String REMOVE_SCHEMAS = "remove-schemas";
static final String REMOVE_SORT_ORDERS = "remove-sort-orders";
static final String ADD_ENCRYPTION_KEY = "add-encryption-key";
static final String REMOVE_ENCRYPTION_KEY = "remove-encryption-key";

Expand Down Expand Up @@ -134,6 +135,9 @@ private MetadataUpdateParser() {}
// RemoveSchemas
private static final String SCHEMA_IDS = "schema-ids";

// RemoveSortOrders
private static final String SORT_ORDER_IDS = "sort-order-ids";

// AddEncryptionKey
private static final String ENCRYPTION_KEY = "encryption-key";

Expand Down Expand Up @@ -165,6 +169,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS)
.put(MetadataUpdate.RemoveSortOrders.class, REMOVE_SORT_ORDERS)
.put(MetadataUpdate.AddEncryptionKey.class, ADD_ENCRYPTION_KEY)
.put(MetadataUpdate.RemoveEncryptionKey.class, REMOVE_ENCRYPTION_KEY)
.buildOrThrow();
Expand Down Expand Up @@ -271,6 +276,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_ENCRYPTION_KEY:
writeRemoveEncryptionKey((MetadataUpdate.RemoveEncryptionKey) metadataUpdate, generator);
break;
case REMOVE_SORT_ORDERS:
writeRemoveSortOrders((MetadataUpdate.RemoveSortOrders) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -350,6 +358,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddEncryptionKey(jsonNode);
case REMOVE_ENCRYPTION_KEY:
return readRemoveEncryptionKey(jsonNode);
case REMOVE_SORT_ORDERS:
return readSortOrders(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -493,6 +503,11 @@ private static void writeRemoveSchemas(
JsonUtil.writeIntegerArray(SCHEMA_IDS, metadataUpdate.schemaIds(), gen);
}

private static void writeRemoveSortOrders(
MetadataUpdate.RemoveSortOrders metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(SORT_ORDER_IDS, metadataUpdate.sortOrderIds(), gen);
}

private static void writeAddEncryptionKey(
MetadataUpdate.AddEncryptionKey update, JsonGenerator gen) throws IOException {
gen.writeFieldName(ENCRYPTION_KEY);
Expand Down Expand Up @@ -654,6 +669,10 @@ private static MetadataUpdate readRemoveSchemas(JsonNode node) {
return new MetadataUpdate.RemoveSchemas(JsonUtil.getIntegerSet(SCHEMA_IDS, node));
}

private static MetadataUpdate readSortOrders(JsonNode node) {
return new MetadataUpdate.RemoveSortOrders(JsonUtil.getIntegerSet(SORT_ORDER_IDS, node));
}

private static MetadataUpdate readAddEncryptionKey(JsonNode node) {
JsonNode keyNode = node.get(ENCRYPTION_KEY);
Preconditions.checkArgument(
Expand Down
30 changes: 27 additions & 3 deletions core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -224,16 +226,31 @@ private TableMetadata internalApply() {
reachableSpecs.add(base.defaultSpecId());
Set<Integer> reachableSchemas = Sets.newConcurrentHashSet();
reachableSchemas.add(base.currentSchemaId());
Set<Integer> reachableSortOrders = Sets.newConcurrentHashSet();
reachableSortOrders.add(base.defaultSortOrderId());

Tasks.foreach(idsToRetain)
.executeWith(planExecutorService())
.run(
snapshotId -> {
Snapshot snapshot = base.snapshot(snapshotId);
snapshot.allManifests(ops.io()).stream()
.map(ManifestFile::partitionSpecId)
.forEach(reachableSpecs::add);
List<ManifestFile> manifests = snapshot.allManifests(ops.io());

// Collect partition spec IDs
manifests.stream().map(ManifestFile::partitionSpecId).forEach(reachableSpecs::add);

reachableSchemas.add(snapshot.schemaId());

// Collect sort order IDs from live entries
manifests.stream()
.map(manifestFile -> ManifestFiles.open(manifestFile, ops.io()))
.map(ManifestReader::entries)
.flatMap(entries -> StreamSupport.stream(entries.spliterator(), false))
.filter(ManifestEntry::isLive)
.map(ManifestEntry::file)
.map(ContentFile::sortOrderId)
.filter(Objects::nonNull)
.forEach(reachableSortOrders::add);
});

Set<Integer> specsToRemove =
Expand All @@ -249,6 +266,13 @@ private TableMetadata internalApply() {
.filter(schemaId -> !reachableSchemas.contains(schemaId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSchemas(schemasToRemove);

Set<Integer> sortOrdersToRemove =
base.sortOrders().stream()
.map(SortOrder::orderId)
.filter(sortId -> !reachableSortOrders.contains(sortId))
.collect(Collectors.toSet());
updatedMetaBuilder.removeSortOrders(sortOrdersToRemove);
}

return updatedMetaBuilder.build();
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,21 @@ public Builder addSortOrder(SortOrder order) {
return this;
}

Builder removeSortOrders(Iterable<Integer> sortIds) {
Set<Integer> sortIdsToRemove = Sets.newHashSet(sortIds);
Preconditions.checkArgument(
!sortIdsToRemove.contains(defaultSortOrderId), "Cannot remove the default sort-id");

if (!sortIdsToRemove.isEmpty()) {
this.sortOrders =
sortOrders.stream()
.filter(s -> !sortIdsToRemove.contains(s.orderId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveSortOrders(sortIdsToRemove));
}
return this;
}

public Builder addSnapshot(Snapshot snapshot) {
if (snapshot == null) {
// change is a noop
Expand Down
26 changes: 19 additions & 7 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.RemovePartitionSpecs) update);
} else if (update instanceof MetadataUpdate.RemoveSchemas) {
update((MetadataUpdate.RemoveSchemas) update);
} else if (update instanceof MetadataUpdate.RemoveSortOrders) {
update((MetadataUpdate.RemoveSortOrders) update);
}

return this;
Expand Down Expand Up @@ -156,13 +158,7 @@ private void update(MetadataUpdate.SetDefaultPartitionSpec unused) {
}

private void update(MetadataUpdate.SetDefaultSortOrder unused) {
if (!setOrderId) {
if (base != null && !isReplace) {
// require that the default write order has not changed
require(new UpdateRequirement.AssertDefaultSortOrderID(base.defaultSortOrderId()));
}
this.setOrderId = true;
}
requireDefaultSortOrderNotChanged();
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
Expand All @@ -179,6 +175,12 @@ private void update(MetadataUpdate.RemoveSchemas unused) {
requireNoBranchesChanged();
}

private void update(MetadataUpdate.RemoveSortOrders unused) {
requireDefaultSortOrderNotChanged();

requireNoBranchesChanged();
}

private void requireDefaultPartitionSpecNotChanged() {
if (!setSpecId) {
if (base != null && !isReplace) {
Expand All @@ -197,6 +199,16 @@ private void requireCurrentSchemaNotChanged() {
}
}

private void requireDefaultSortOrderNotChanged() {
if (!setOrderId) {
if (base != null && !isReplace) {
// require that the default write order has not changed
require(new UpdateRequirement.AssertDefaultSortOrderID(base.defaultSortOrderId()));
}
this.setOrderId = true;
}
}

private void requireNoBranchesChanged() {
if (base != null && !isReplace) {
base.refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,17 @@ public void testRemoveSchemas() {
.isEqualTo(json);
}

@Test
public void testRemoveSortOrders() {
String action = MetadataUpdateParser.REMOVE_SORT_ORDERS;
String json = "{\"action\":\"remove-sort-orders\",\"sort-order-ids\":[1,2,3]}";
MetadataUpdate expected = new MetadataUpdate.RemoveSortOrders(ImmutableSet.of(1, 2, 3));
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
assertThat(MetadataUpdateParser.toJson(expected))
.as("Remove Sort Orders should convert to the correct JSON value")
.isEqualTo(json);
}

@Test
public void testAddEncryptionKey() {
byte[] keyBytes = "key".getBytes(StandardCharsets.UTF_8);
Expand Down Expand Up @@ -1113,6 +1124,11 @@ public void assertEquals(
(MetadataUpdate.RemoveSchemas) expectedUpdate,
(MetadataUpdate.RemoveSchemas) actualUpdate);
break;
case MetadataUpdateParser.REMOVE_SORT_ORDERS:
assertEqualsRemoveSortOrders(
(MetadataUpdate.RemoveSortOrders) expectedUpdate,
(MetadataUpdate.RemoveSortOrders) actualUpdate);
break;
case MetadataUpdateParser.ADD_ENCRYPTION_KEY:
assertEqualsAddEncryptionKey(
(MetadataUpdate.AddEncryptionKey) expectedUpdate,
Expand Down Expand Up @@ -1354,6 +1370,11 @@ private static void assertEqualsRemoveSchemas(
assertThat(actual.schemaIds()).containsExactlyInAnyOrderElementsOf(expected.schemaIds());
}

private static void assertEqualsRemoveSortOrders(
MetadataUpdate.RemoveSortOrders expected, MetadataUpdate.RemoveSortOrders actual) {
assertThat(actual.sortOrderIds()).containsExactlyInAnyOrderElementsOf(expected.sortOrderIds());
}

private static void assertEqualsAddEncryptionKey(
MetadataUpdate.AddEncryptionKey expected, MetadataUpdate.AddEncryptionKey actual) {
assertThat(actual.key().keyId()).isEqualTo(expected.key().keyId());
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,59 @@ public void testNoSchemasOrSpecsToRemove() {
meta ->
meta.changes().stream()
.anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas)));
Mockito.verify(ops, Mockito.never())
.commit(
any(),
argThat(
meta ->
meta.changes().stream()
.anyMatch(u -> u instanceof MetadataUpdate.RemoveSortOrders)));
}

@TestTemplate
public void testRemoveSortOrders() {
SortOrder idSortOrder = SortOrder.builderFor(table.schema()).asc("id").build();
DataFile initialFile =
DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(1)
.withSortOrder(idSortOrder)
.build();
table.newAppend().appendFile(initialFile).commit();

Set<String> expectedDeletedFiles = Sets.newHashSet();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.replaceSortOrder().asc("data").commit();

DataFile dataFile =
DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(1)
.withSortOrder(table.sortOrder())
.build();
table.newAppend().appendFile(dataFile).commit();
expectedDeletedFiles.add(table.currentSnapshot().manifestListLocation());

table.newDelete().deleteFile(initialFile).commit();

assertThat(table.sortOrders()).hasSize(2);

Set<String> deletedFiles = Sets.newHashSet();
// Expire all snapshots and sort-orders except the current ones.
removeSnapshots(table)
.expireOlderThan(System.currentTimeMillis())
.cleanExpiredMetadata(true)
.deleteWith(deletedFiles::add)
.commit();

// other manifest files can be present, as there is delete file
assertThat(deletedFiles).containsAll(expectedDeletedFiles);
assertThat(table.sortOrders().values()).containsExactly(table.sortOrder());
}

@TestTemplate
Expand Down
18 changes: 18 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1960,4 +1960,22 @@ public void testMetadataWithRemoveSchemas() {

assertThat(meta.changes()).anyMatch(u -> u instanceof MetadataUpdate.RemoveSchemas);
}

@Test
public void testMetadataWithRemoveSortOrders() {
TableMetadata base =
TableMetadata.newTableMetadata(
TestBase.SCHEMA, PartitionSpec.unpartitioned(), null, ImmutableMap.of());
TableMetadata meta =
TableMetadata.buildFrom(base)
.addSortOrder(SortOrder.builderFor(base.schema()).asc("id").build())
.addSortOrder(SortOrder.builderFor(base.schema()).asc("data").build())
.removeSortOrders(Sets.newHashSet())
.build();

assertThat(meta.changes()).noneMatch(u -> u instanceof MetadataUpdate.RemoveSortOrders);

meta = TableMetadata.buildFrom(meta).removeSortOrders(Sets.newHashSet(1)).build();
assertThat(meta.changes()).anyMatch(u -> u instanceof MetadataUpdate.RemoveSortOrders);
}
}
Loading