diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 1e0e963297..fe265c3072 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -287,4 +287,15 @@ public static void enforceFeatureEnabledOrThrow( + "This should only be set to 'true' for tests!") .defaultValue(false) .buildFeatureConfiguration(); + + public static final FeatureConfiguration ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS = + PolarisConfiguration.builder() + .key("ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS") + .catalogConfig("polaris.config.rollback.compaction.on-conflicts.enabled") + .description( + "Rollback replace snapshots created by compaction which have " + + "polaris.internal.conflict-resolution.by-operation-type.replace property set to rollback " + + "in their snapshot summary") + .defaultValue(false) + .buildFeatureConfiguration(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 614fab7255..dca58933cb 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -19,6 +19,8 @@ package org.apache.polaris.service.quarkus.catalog; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doReturn; @@ -29,6 +31,8 @@ import com.azure.core.exception.HttpResponseException; import com.google.cloud.storage.StorageException; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import io.quarkus.test.junit.QuarkusMock; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; @@ -37,6 +41,7 @@ import jakarta.inject.Inject; import jakarta.ws.rs.core.SecurityContext; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.reflect.Method; import java.time.Clock; import java.util.Arrays; @@ -48,16 +53,29 @@ import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.NotImplementedException; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirements; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; @@ -68,8 +86,11 @@ import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; @@ -111,6 +132,7 @@ import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.service.admin.PolarisAdminService; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; +import org.apache.polaris.service.catalog.iceberg.CatalogHandlerUtils; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO; @@ -135,7 +157,9 @@ import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; import org.apache.polaris.service.types.TableUpdateNotification; +import org.assertj.core.api.AbstractCollectionAssert; import org.assertj.core.api.Assertions; +import org.assertj.core.api.ListAssert; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.assertj.core.configuration.PreferredAssumptionException; import org.junit.jupiter.api.AfterEach; @@ -164,6 +188,15 @@ public abstract class IcebergCatalogTest extends CatalogTests { PreferredAssumptionException.JUNIT5); } + DeleteFile FILE_A_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/data-a-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + public static class Profile implements QuarkusTestProfile { @Override @@ -554,6 +587,332 @@ public void testCreateNestedNamespaceUnderMissingParent() { .hasMessageContaining("Parent"); } + @Test + public void testConcurrentWritesWithRollbackNonEmptyTable() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List updates = updatedMetadata.changes(); + List requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B + // set the snapshot property in the summary to make this snapshot + // rollback-able. + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + try { + // Now call IRC server to commit delete operation. + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + catalogHandlerUtils.commit(((BaseTable) catalog.loadTable(TABLE)).operations(), request); + } catch (Exception e) { + fail("Rollback Compaction on conflict feature failed : " + e); + } + + table.refresh(); + + // Assert only 2 snapshots and no snapshot of REPLACE left. + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + // no snapshot in the hierarchy for REPLACE operations + assertThat(currentSnapshot.operation()).isNotEqualTo(DataOperations.REPLACE); + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(2); + + // Inspect the files 1 DELETE file i.e. FILE_A_DELETES and 1 DATA FILE FILE_A + try { + try (CloseableIterable tasks = table.newScan().planFiles()) { + List dataFilePaths = + Streams.stream(tasks) + .map(ContentScanTask::file) + .map(ContentFile::location) + .collect(Collectors.toList()); + List deleteFilePaths = + Streams.stream(tasks) + .flatMap(t -> t.deletes().stream().map(ContentFile::location)) + .collect(Collectors.toList()); + ((ListAssert) + Assertions.assertThat(dataFilePaths) + .as("Should contain expected number of data files", new Object[0])) + .hasSize(1); + ((ListAssert) + Assertions.assertThat(deleteFilePaths) + .as("Should contain expected number of delete files", new Object[0])) + .hasSize(1); + ((AbstractCollectionAssert) + Assertions.assertThat(CharSequenceSet.of(dataFilePaths)) + .as("Should contain correct file paths", new Object[0])) + .isEqualTo( + CharSequenceSet.of( + Iterables.transform(Arrays.asList(FILE_A), ContentFile::location))); + ((AbstractCollectionAssert) + Assertions.assertThat(CharSequenceSet.of(deleteFilePaths)) + .as("Should contain correct file paths", new Object[0])) + .isEqualTo( + CharSequenceSet.of( + Iterables.transform(Arrays.asList(FILE_A_DELETES), ContentFile::location))); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + public void testConcurrentWritesWithRollbackWithNonReplaceSnapshotInBetween() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List updates = updatedMetadata.changes(); + List requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B + // commit the transaction. + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + // commit FILE_C + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).commit(); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + Assertions.assertThatThrownBy( + () -> + catalogHandlerUtils.commit( + ((BaseTable) catalog.loadTable(TABLE)).operations(), request)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main has changed"); + + table.refresh(); + + // Assert only 3 snapshots + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(3); + this.assertFiles(catalog.loadTable(TABLE), FILE_B, FILE_C); + } + + @Test + public void + testConcurrentWritesWithRollbackEnableWithToRollbackSnapshotReferencedByOtherBranch() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List updates = updatedMetadata.changes(); + List requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + Table t = catalog.loadTable(TABLE); + // add another branch B + t.manageSnapshots() + .createBranch("non-main") + .setCurrentSnapshot(t.currentSnapshot().snapshotId()) + .commit(); + // now add more files to non-main branch + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit(); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + Assertions.assertThatThrownBy( + () -> + catalogHandlerUtils.commit( + ((BaseTable) catalog.loadTable(TABLE)).operations(), request)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main has changed"); + + table.refresh(); + + // Assert only 3 snapshots + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(2); + this.assertFiles(catalog.loadTable(TABLE), FILE_B); + } + + @Test + public void testConcurrentWritesWithRollbackWithConcurrentWritesToDifferentBranches() { + IcebergCatalog catalog = this.catalog(); + if (this.requiresNamespaceCreate()) { + catalog.createNamespace(NS); + } + + Table table = catalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + this.assertNoFiles(table); + + // commit FILE_A to main branch + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit(); + this.assertFiles(catalog.loadTable(TABLE), FILE_A); + table.refresh(); + + Table t = catalog.loadTable(TABLE); + // add another branch B + t.manageSnapshots() + .createBranch("non-main") + .setCurrentSnapshot(t.currentSnapshot().snapshotId()) + .commit(); + + long lastSnapshotId = table.currentSnapshot().snapshotId(); + + // Apply the deletes based on FILE_A + // this should conflict when we try to commit without the change. + RowDelta originalRowDelta = + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .validateFromSnapshot(lastSnapshotId) + .validateDataFilesExist(List.of(FILE_A.location())); + // Make client ready with updates, don't reach out to IRC server yet + Snapshot s = originalRowDelta.apply(); + TableOperations ops = ((BaseTable) catalog.loadTable(TABLE)).operations(); + TableMetadata base = ops.current(); + TableMetadata.Builder update = TableMetadata.buildFrom(base); + update.setBranchSnapshot(s, "main"); + TableMetadata updatedMetadata = update.build(); + List updates = updatedMetadata.changes(); + List requirements = UpdateRequirements.forUpdateTable(base, updates); + UpdateTableRequest request = UpdateTableRequest.create(TABLE, requirements, updates); + + // replace FILE_A with FILE_B on main branch + catalog + .loadTable(TABLE) + .newRewrite() + .addFile(FILE_B) + .deleteFile(FILE_A) + .set("polaris.internal.conflict-resolution.by-operation-type.replace", "rollback") + .commit(); + + // now add more files to non-main branch, this will make sequence number non monotonic for main + // branch + catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_C).toBranch("non-main").commit(); + CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(5, true); + Assertions.assertThatThrownBy( + () -> + catalogHandlerUtils.commit( + ((BaseTable) catalog.loadTable(TABLE)).operations(), request)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: branch main has changed"); + + table.refresh(); + + // Assert only 3 snapshots + Snapshot currentSnapshot = table.snapshot(table.refs().get("main").snapshotId()); + int totalSnapshots = 1; + while (currentSnapshot.parentId() != null) { + currentSnapshot = table.snapshot(currentSnapshot.parentId()); + totalSnapshots += 1; + } + assertThat(totalSnapshots).isEqualTo(2); + this.assertFiles(catalog.loadTable(TABLE), FILE_B); + } + @Test public void testValidateNotificationWhenTableAndNamespacesDontExist() { Assumptions.assumeTrue( diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java index aa99d53f50..ae879ea5f0 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -22,16 +22,21 @@ import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import java.lang.reflect.Field; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,9 +44,13 @@ import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -74,6 +83,8 @@ import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseView; import org.apache.iceberg.view.SQLViewRepresentation; @@ -85,6 +96,8 @@ import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.RealmContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * CODE_COPIED_TO_POLARIS Copied from CatalogHandler in Iceberg 1.8.0 Contains a collection of @@ -92,17 +105,41 @@ */ @ApplicationScoped public class CatalogHandlerUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogHandlerUtils.class); + private static final Schema EMPTY_SCHEMA = new Schema(); private static final String INITIAL_PAGE_TOKEN = ""; + private static final String CONFLICT_RESOLUTION_ACTION = + "polaris.internal.conflict-resolution.by-operation-type.replace"; + private static final Field LAST_SEQUENCE_NUMBER_FIELD; - private final RealmContext realmContext; - private final PolarisConfigurationStore configurationStore; + static { + try { + LAST_SEQUENCE_NUMBER_FIELD = + TableMetadata.Builder.class.getDeclaredField("lastSequenceNumber"); + LAST_SEQUENCE_NUMBER_FIELD.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Unable to access field", e); + } + } + + private final int maxCommitRetries; + private final boolean rollbackCompactionEnabled; @Inject public CatalogHandlerUtils( RealmContext realmContext, PolarisConfigurationStore configurationStore) { - this.realmContext = realmContext; - this.configurationStore = configurationStore; + this( + configurationStore.getConfiguration( + realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES), + configurationStore.getConfiguration( + realmContext, FeatureConfiguration.ICEBERG_ROLLBACK_COMPACTION_ON_CONFLICTS)); + } + + @VisibleForTesting + public CatalogHandlerUtils(int maxCommitRetries, boolean rollbackCompactionEnabled) { + this.maxCommitRetries = maxCommitRetries; + this.rollbackCompactionEnabled = rollbackCompactionEnabled; } /** @@ -421,11 +458,12 @@ private TableMetadata create(TableOperations ops, UpdateTableRequest request) { return ops.current(); } - protected TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + @VisibleForTesting + public TableMetadata commit(TableOperations ops, UpdateTableRequest request) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(maxCommitRetries()) + .retry(maxCommitRetries) .exponentialBackoff( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, @@ -435,30 +473,106 @@ protected TableMetadata commit(TableOperations ops, UpdateTableRequest request) .run( taskOps -> { TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); - isRetry.set(true); - // validate requirements + TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); + TableMetadata newBase = base; try { - request.requirements().forEach(requirement -> requirement.validate(base)); + request.requirements().forEach((requirement) -> requirement.validate(base)); + } catch (CommitFailedException e) { + if (!rollbackCompactionEnabled) { + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + LOGGER.debug( + "Attempting to Rollback replace operations for table={}, with current-snapshot-id={}", + base.uuid(), + base.currentSnapshot().snapshotId()); + UpdateRequirement.AssertRefSnapshotID assertRefSnapshotId = + findAssertRefSnapshotID(request); + MetadataUpdate.SetSnapshotRef setSnapshotRef = findSetSnapshotRefUpdate(request); + + if (assertRefSnapshotId == null || setSnapshotRef == null) { + // This implies the request was not trying to add a snapshot. + LOGGER.debug( + "Giving up on Rollback replace operations for table={}, with current-snapshot-id={}, as operation doesn't attempts to add a single snapshot", + base.uuid(), + base.currentSnapshot().snapshotId()); + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + // snapshot-id the client expects the table current_snapshot_id + long expectedCurrentSnapshotId = assertRefSnapshotId.snapshotId(); + + MetadataUpdate.AddSnapshot snapshotToBeAdded = findAddSnapshotUpdate(request); + if (snapshotToBeAdded == null) { + // Re-throw if, there's no snapshot data to be added. + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + + LOGGER.info( + "Attempting to Rollback replace operation for table={}, with current-snapshot-id={}, to snapshot={}", + base.uuid(), + base.currentSnapshot().snapshotId(), + snapshotToBeAdded.snapshot().snapshotId()); + + List metadataUpdates = + generateUpdatesToRemoveNoopSnapshot( + base, expectedCurrentSnapshotId, setSnapshotRef.name()); + + if (metadataUpdates == null || metadataUpdates.isEmpty()) { + // Nothing can be done as this implies that there were not all + // No-op snapshots (REPLACE) between expectedCurrentSnapshotId and + // currentSnapshotId. hence re-throw the exception caught. + // wrap and rethrow outside of tasks to avoid unnecessary retry + throw new ValidationFailureException(e); + } + // Set back the ref we wanted to set, back to the snapshot-id + // the client is expecting the table to be at. + metadataBuilder.setBranchSnapshot( + expectedCurrentSnapshotId, setSnapshotRef.name()); + + // apply the remove snapshots update in the current metadata. + // NOTE: we need to setRef to expectedCurrentSnapshotId first and then apply + // remove, as otherwise the remove will drop the reference. + // NOTE: we can skip removing the now orphan base. It's not a hard requirement. + // just something good to do, and not leave for Remove Orphans. + // Ref rolled back update correctly to snapshot to be committed parent now. + metadataUpdates.forEach((update -> update.applyTo(metadataBuilder))); + newBase = + setAppropriateLastSeqNumber( + metadataBuilder, + base.uuid(), + base.lastSequenceNumber(), + base.snapshot(expectedCurrentSnapshotId).sequenceNumber()) + .build(); + LOGGER.info( + "Successfully roll-backed replace operation for table={}, with current-snapshot-id={}, to snapshot={}", + base.uuid(), + base.currentSnapshot().snapshotId(), + newBase.currentSnapshot().snapshotId()); + } + // double check if the requirements passes now. + try { + TableMetadata baseWithRemovedSnaps = newBase; + request + .requirements() + .forEach((requirement) -> requirement.validate(baseWithRemovedSnaps)); } catch (CommitFailedException e) { // wrap and rethrow outside of tasks to avoid unnecessary retry throw new ValidationFailureException(e); } - // apply changes - TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); - - TableMetadata updated = metadataBuilder.build(); + TableMetadata.Builder newMetadataBuilder = TableMetadata.buildFrom(newBase); + request.updates().forEach((update) -> update.applyTo(newMetadataBuilder)); + TableMetadata updated = newMetadataBuilder.build(); if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed return; } - - // commit taskOps.commit(base, updated); }); - } catch (ValidationFailureException e) { throw e.wrapped(); } @@ -466,6 +580,140 @@ protected TableMetadata commit(TableOperations ops, UpdateTableRequest request) return ops.current(); } + private UpdateRequirement.AssertRefSnapshotID findAssertRefSnapshotID( + UpdateTableRequest request) { + UpdateRequirement.AssertRefSnapshotID assertRefSnapshotID = null; + int total = 0; + for (UpdateRequirement requirement : request.requirements()) { + if (requirement instanceof UpdateRequirement.AssertRefSnapshotID) { + ++total; + assertRefSnapshotID = (UpdateRequirement.AssertRefSnapshotID) requirement; + } + } + + // if > 1 assertion for refs, then it's not safe to roll back, make this Noop. + return total != 1 ? null : assertRefSnapshotID; + } + + private List generateUpdatesToRemoveNoopSnapshot( + TableMetadata base, long expectedCurrentSnapshotId, String updateRefName) { + // find the all the snapshots we want to retain which are not the part of current branch. + Set idsToRetain = Sets.newHashSet(); + for (Map.Entry ref : base.refs().entrySet()) { + String refName = ref.getKey(); + SnapshotRef snapshotRef = ref.getValue(); + if (refName.equals(updateRefName)) { + continue; + } + idsToRetain.add(ref.getValue().snapshotId()); + // Always check the ancestry for both branch and tags + // mostly for case where a branch was created and then was dropped + // then a tag was created and then rollback happened post that tag + // was dropped and branch was re-created on it. + for (Snapshot ancestor : SnapshotUtil.ancestorsOf(snapshotRef.snapshotId(), base::snapshot)) { + idsToRetain.add(ancestor.snapshotId()); + } + } + + List updateToRemoveSnapshot = new ArrayList<>(); + Long snapshotId = base.ref(updateRefName).snapshotId(); // current tip of the given branch + // ensure this branch has the latest sequence number. + long expectedSequenceNumber = base.lastSequenceNumber(); + // Unexpected state as table's current sequence number is not equal to the + // most recent snapshot the ref points to. + if (expectedSequenceNumber != base.snapshot(snapshotId).sequenceNumber()) { + LOGGER.debug( + "Giving up rolling back table {} to snapshot {}, ref current snapshot sequence number {} is not equal expected sequence number {}", + base.uuid(), + snapshotId, + base.snapshot(snapshotId).sequenceNumber(), + expectedSequenceNumber); + return null; + } + Set snapshotsToRemove = new LinkedHashSet<>(); + while (snapshotId != null && !Objects.equals(snapshotId, expectedCurrentSnapshotId)) { + Snapshot snap = base.snapshot(snapshotId); + if (!isRollbackSnapshot(snap) || idsToRetain.contains(snapshotId)) { + // Either encountered a non no-op snapshot or the snapshot is being referenced by any other + // reference either by branch or a tag. + LOGGER.debug( + "Giving up rolling back table {} to snapshot {}, snapshot to be removed referenced by another branch or tag ancestor", + base.uuid(), + snapshotId); + break; + } + snapshotsToRemove.add(snap.snapshotId()); + snapshotId = snap.parentId(); + } + + boolean wasExpectedSnapshotReached = Objects.equals(snapshotId, expectedCurrentSnapshotId); + updateToRemoveSnapshot.add(new MetadataUpdate.RemoveSnapshots(snapshotsToRemove)); + return wasExpectedSnapshotReached ? updateToRemoveSnapshot : null; + } + + private boolean isRollbackSnapshot(Snapshot snapshot) { + // Only Snapshots with {@ROLLBACKABLE_REPLACE_SNAPSHOT} are allowed to be rollback. + return DataOperations.REPLACE.equals(snapshot.operation()) + && PropertyUtil.propertyAsString(snapshot.summary(), CONFLICT_RESOLUTION_ACTION, "") + .equalsIgnoreCase("rollback"); + } + + private MetadataUpdate.SetSnapshotRef findSetSnapshotRefUpdate(UpdateTableRequest request) { + int total = 0; + MetadataUpdate.SetSnapshotRef setSnapshotRefUpdate = null; + // find the SetRefName snapshot update + for (MetadataUpdate update : request.updates()) { + if (update instanceof MetadataUpdate.SetSnapshotRef) { + total++; + setSnapshotRefUpdate = (MetadataUpdate.SetSnapshotRef) update; + } + } + + // if > 1 assertion for refs, then it's not safe to rollback, make this Noop. + return total != 1 ? null : setSnapshotRefUpdate; + } + + private MetadataUpdate.AddSnapshot findAddSnapshotUpdate(UpdateTableRequest request) { + int total = 0; + MetadataUpdate.AddSnapshot addSnapshot = null; + // find the SetRefName snapshot update + for (MetadataUpdate update : request.updates()) { + if (update instanceof MetadataUpdate.AddSnapshot) { + total++; + addSnapshot = (MetadataUpdate.AddSnapshot) update; + } + } + + // if > 1 assertion for addSnapshot, then it's not safe to rollback, make this Noop. + return total != 1 ? null : addSnapshot; + } + + private TableMetadata.Builder setAppropriateLastSeqNumber( + TableMetadata.Builder metadataBuilder, + String tableUUID, + long currentSequenceNumber, + long expectedSequenceNumber) { + // TODO: Get rid of the reflection call once TableMetadata have API for it. + // move the lastSequenceNumber back, to apply snapshot properly on the + // current-metadata Seq number are considered increasing monotonically + // snapshot over snapshot, the client generates the manifest list and hence + // the sequence number can't be changed for a snapshot the only possible option + // then is to change the sequenceNumber tracked by metadata.json + try { + // this should point to the sequence number that current tip of the + // branch belongs to, as the new commit will be applied on top of this. + LAST_SEQUENCE_NUMBER_FIELD.set(metadataBuilder, expectedSequenceNumber); + LOGGER.info( + "Setting table uuid:{} last sequence number from:{} to {}", + tableUUID, + currentSequenceNumber, + expectedSequenceNumber); + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } + return metadataBuilder; + } + private BaseView asBaseView(View view) { Preconditions.checkState( view instanceof BaseView, "Cannot wrap catalog that does not produce BaseView"); @@ -565,7 +813,7 @@ protected ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(maxCommitRetries()) + .retry(maxCommitRetries) .exponentialBackoff( COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, @@ -606,9 +854,4 @@ protected ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { return ops.current(); } - - private int maxCommitRetries() { - return configurationStore.getConfiguration( - realmContext, FeatureConfiguration.ICEBERG_COMMIT_MAX_RETRIES); - } }