Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
15b9708
Issue-629: Cherrypick Id
romin-adobe Dec 7, 2019
768e051
Removed redundant methods and changed method name
prodeezy Dec 7, 2019
0c91972
Fix Imports
romin-adobe Dec 7, 2019
fc46ae4
Fix Operation Check
romin-adobe Dec 7, 2019
f73bd0d
Fix Error Message
romin-adobe Dec 7, 2019
d417ef5
Cherry picking operation to apply changes from incoming snapshot on c…
prodeezy Dec 9, 2019
aea0746
Initial working version of cherry-pick operation which applies append…
prodeezy Dec 10, 2019
75ba9c5
Added DS option to set write-wap-id
prodeezy Dec 10, 2019
7e96144
Merge branch 'master' into cherrypick.snapshot
prodeezy Dec 10, 2019
0befad4
Add test cases
romin-adobe Dec 10, 2019
1de9863
Handle Duplicate Cherry Picks
romin-adobe Dec 11, 2019
0a7a04c
Handle Duplicate Cherry Picks
romin-adobe Dec 11, 2019
c7f292b
Differentiate between staged and published wap ids
prodeezy Dec 11, 2019
c73119f
Fix check style errors and put in null check for summary. add check f…
prodeezy Dec 11, 2019
5a7d059
Added SnapshotSummary.PUBLISHED_WAP_ID_PROP prop
prodeezy Dec 16, 2019
ecc6f44
Incorporating some of the feedback
prodeezy Dec 17, 2019
2fb7333
# This is a combination of 7 commits.
romin-adobe Dec 7, 2019
5aceeb4
Issue-629: Cherrypick Id
prodeezy Dec 17, 2019
2a97572
Merge branch 'cherrypick.snapshot' of https://github.com/rominparekh/…
prodeezy Dec 17, 2019
8facf5a
Removed some dead code
prodeezy Dec 17, 2019
4a4ab93
Combined rollback and cherrypick into ManageSnapshots
prodeezy Jan 8, 2020
ecef2cc
Added back and deprecated Rollback api. addressed some of the review …
prodeezy Jan 10, 2020
4b07bc0
reverted formatting change
prodeezy Jan 10, 2020
799e1cc
reverted old to use deplrecated api
prodeezy Jan 10, 2020
2e2ed7a
added back rollback api doc
prodeezy Jan 10, 2020
e037ecb
Added specific exception handling for duplicate wap commits, moved re…
prodeezy Jan 16, 2020
1ae0698
Moved cherrypicking code to the corresponding configuration method
prodeezy Jan 16, 2020
50ec210
Added rollbackTo with ancestor checks, added tests for the same
prodeezy Jan 17, 2020
d123d1a
Addressed review comments
prodeezy Jan 17, 2020
becef88
All public methods in SnapshotUtil should take Table as param
prodeezy Jan 17, 2020
25db12a
WapUtil class added. moved duplicate check to apply()
prodeezy Jan 21, 2020
225374c
NOOP if not target snapshot is configured, took out duplicate add ope…
prodeezy Jan 21, 2020
78c8a91
Added guard from double snapshot ancestor cherrypicking, linking sour…
prodeezy Jan 27, 2020
98816e5
Add test for non-wap cherrypicking
prodeezy Jan 27, 2020
6e11c4f
Fixed checkstyle error
prodeezy Jan 27, 2020
ea79707
Minimize changes and fix commit logic.
rdblue Jan 27, 2020
0a305c4
Merge pull request #2 from rdblue/cherry-pick-snapshot
prodeezy Jan 28, 2020
66363bb
Update UTs and add null check
romin-adobe Jan 29, 2020
cc020ca
Fix refresh logic in SnapshotProducer subclasses.
rdblue Feb 3, 2020
6ca3a63
Merge pull request #4 from rdblue/cherry-pick-snapshot
prodeezy Feb 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions api/src/main/java/org/apache/iceberg/ManageSnapshots.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg;

import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.DuplicateWAPCommitException;
import org.apache.iceberg.exceptions.ValidationException;

/**
* API for managing snapshots. Allows rolling table data back to a stated at an older table {@link Snapshot snapshot}.
* Rollback:
* <p>
* This API does not allow conflicting calls to {@link #setCurrentSnapshot(long)} and
* {@link #rollbackToTime(long)}.
* <p>
* When committing, these changes will be applied to the current table metadata. Commit conflicts
* will not be resolved and will result in a {@link CommitFailedException}.
* Cherrypick:
* <p>
* In an audit workflow, new data is written to an orphan {@link Snapshot snapshot} that is not committed as
* the table's current state until it is audited. After auditing a change, it may need to be applied or cherry-picked
* on top of the latest snapshot instead of the one that was current when the audited changes were created.
* This class adds support for cherry-picking the changes from an orphan snapshot by applying them to
* the current snapshot. The output of the operation is a new snapshot with the changes from cherry-picked
* snapshot.
* <p>
*/
public interface ManageSnapshots extends PendingUpdate<Snapshot> {

/**
* Roll this table's data back to a specific {@link Snapshot} identified by id.
*
* @param snapshotId long id of the snapshot to roll back table data to
* @return this for method chaining
* @throws IllegalArgumentException If the table has no snapshot with the given id
*/
ManageSnapshots setCurrentSnapshot(long snapshotId);

/**
* Roll this table's data back to the last {@link Snapshot} before the given timestamp.
*
* @param timestampMillis a long timestamp, as returned by {@link System#currentTimeMillis()}
* @return this for method chaining
* @throws IllegalArgumentException If the table has no old snapshot before the given timestamp
*/
ManageSnapshots rollbackToTime(long timestampMillis);

/**
* Rollback table's state to a specific {@link Snapshot} identified by id.
* @param snapshotId long id of snapshot id to roll back table to. Must be an ancestor of the current snapshot
* @throws IllegalArgumentException If the table has no snapshot with the given id
* @throws ValidationException If given snapshot id is not an ancestor of the current state
*/
ManageSnapshots rollbackTo(long snapshotId);

/**
* Apply supported changes in given snapshot and create a new snapshot which will be set as the
* current snapshot on commit.
* @param snapshotId a snapshotId whose changes to apply
* @return this for method chaining
* @throws IllegalArgumentException If the table has no snapshot with the given id
* @throws DuplicateWAPCommitException In case of a WAP workflow and if the table has has a duplicate commit with same
* wapId
*/
ManageSnapshots cherrypick(long snapshotId);
}
5 changes: 4 additions & 1 deletion api/src/main/java/org/apache/iceberg/Rollback.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public interface Rollback extends PendingUpdate<Snapshot> {
* @param snapshotId long id of the snapshot to roll back table data to
* @return this for method chaining
* @throws IllegalArgumentException If the table has no snapshot with the given id
* @deprecated Replaced by {@link ManageSnapshots#setCurrentSnapshot(long)}
*/
@Deprecated
Rollback toSnapshotId(long snapshotId);

/**
Expand All @@ -47,7 +49,8 @@ public interface Rollback extends PendingUpdate<Snapshot> {
* @param timestampMillis a long timestamp, as returned by {@link System#currentTimeMillis()}
* @return this for method chaining
* @throws IllegalArgumentException If the table has no old snapshot before the given timestamp
* @deprecated Replaced by {@link ManageSnapshots#rollbackToTime(long)}
*/
@Deprecated
Rollback toSnapshotAtTime(long timestampMillis);

}
8 changes: 8 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,17 @@ default AppendFiles newFastAppend() {
* Create a new {@link Rollback rollback API} to roll back to a previous snapshot and commit.
*
* @return a new {@link Rollback}
* @deprecated Replaced by {@link #manageSnapshots()}
*/
@Deprecated
Rollback rollback();

/**
* Create a new {@link ManageSnapshots manage snapshots API} to manage snapshots in this table and commit.
* @return a new {@link ManageSnapshots}
*/
ManageSnapshots manageSnapshots();

/**
* Create a new {@link Transaction transaction API} to commit multiple table operations at once.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.exceptions;

/**
* This exception occurs when one cherrypicks an ancestor or when the picked snapshot is already linked to
* a published ancestor. This additionally helps avoid duplicate cherrypicks on non-WAP snapshots.
*/
public class CherrypickAncestorCommitException extends ValidationException {

public CherrypickAncestorCommitException(long snapshotId) {
super("Cannot cherrypick snapshot %s: already an ancestor", String.valueOf(snapshotId));
}

public CherrypickAncestorCommitException(long snapshotId, long publishedAncestorId) {
super("Cannot cherrypick snapshot %s: already picked to create ancestor %s",
String.valueOf(snapshotId), String.valueOf(publishedAncestorId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.exceptions;

/**
* This exception occurs when the WAP workflow detects a duplicate wap commit. This helps clients
* to detect duplicate snapshots that are connected by the same wap id.
*/
public class DuplicateWAPCommitException extends ValidationException {
Copy link
Contributor Author

@prodeezy prodeezy Jan 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception helps with making WAP workflow idempotent. We use this exception on the DataFrameWriter to differentiate from a generic validation error so that we can NOOP and report success if the wap id was already committed. I added this so that the caller doesn't have to depend on the error message rather can catch a specific exception.


public DuplicateWAPCommitException(String wapId) {
super("Duplicate request to cherry pick wap id that was published already: %s", wapId);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ public Rollback rollback() {
throw new UnsupportedOperationException("Cannot roll back a metadata table");
}

@Override
public ManageSnapshots manageSnapshots() {
throw new UnsupportedOperationException("Cannot manage snapshots in a metadata table");
}

@Override
public Transaction newTransaction() {
throw new UnsupportedOperationException("Cannot create transactions for a metadata table");
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void cacheChanges() {
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(io, changedManifests)
.ignoreExisting()
.select(ManifestReader.CHANGE_WITH_STATS_COLUMNS)
.select(ManifestReader.ALL_COLUMNS)
.entries()) {
for (ManifestEntry entry : entries) {
switch (entry.status()) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public Rollback rollback() {
return new RollbackToSnapshot(ops);
}

@Override
public ManageSnapshots manageSnapshots() {
return new SnapshotManager(ops);
}

@Override
public Transaction newTransaction() {
return Transactions.newTransaction(ops);
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ public Rollback rollback() {
throw new UnsupportedOperationException("Transaction tables do not support rollback");
}

@Override
public ManageSnapshots manageSnapshots() {
throw new UnsupportedOperationException("Transaction tables do not support managing snapshots");
}

@Override
public Transaction newTransaction() {
throw new UnsupportedOperationException("Cannot create a transaction within a transaction");
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public class ManifestReader extends CloseableGroup implements Filterable<FilteredManifest> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class);

private static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
static final ImmutableList<String> CHANGE_COLUMNS = ImmutableList.of(
"file_path", "file_format", "partition", "record_count", "file_size_in_bytes");
static final ImmutableList<String> CHANGE_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
Expand Down
48 changes: 4 additions & 44 deletions core/src/main/java/org/apache/iceberg/RollbackToSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,21 @@

package org.apache.iceberg;

import com.google.common.base.Preconditions;
import org.apache.iceberg.exceptions.ValidationException;

class RollbackToSnapshot implements Rollback {
private final TableOperations ops;
private TableMetadata base = null;
private Long targetSnapshotId = null;
class RollbackToSnapshot extends SnapshotManager implements Rollback {

RollbackToSnapshot(TableOperations ops) {
this.ops = ops;
this.base = ops.current(); // do not retry
super(ops);
}

@Override
public Rollback toSnapshotId(long snapshotId) {
Preconditions.checkArgument(base.snapshot(snapshotId) != null,
"Cannot roll back to unknown snapshot id: %s", snapshotId);

this.targetSnapshotId = snapshotId;

super.setCurrentSnapshot(snapshotId);
return this;
}

@Override
public Rollback toSnapshotAtTime(long timestampMillis) {
long snapshotId = 0;
long snapshotTimestamp = 0;
// find the latest snapshot by timestamp older than timestampMillis
for (Snapshot snapshot : base.snapshots()) {
if (snapshot.timestampMillis() < timestampMillis &&
snapshot.timestampMillis() > snapshotTimestamp) {
snapshotId = snapshot.snapshotId();
snapshotTimestamp = snapshot.timestampMillis();
}
}

Preconditions.checkArgument(base.snapshot(snapshotId) != null,
"Cannot roll back, no valid snapshot older than: %s", timestampMillis);

this.targetSnapshotId = snapshotId;

super.rollbackToTime(timestampMillis);
return this;
}

@Override
public Snapshot apply() {
ValidationException.check(targetSnapshotId != null,
"Cannot roll back to unknown version: call toSnapshotId or toSnapshotAtTime");
return base.snapshot(targetSnapshotId);
}

@Override
public void commit() {
// rollback does not refresh or retry. it only operates on the state of the table when rollback
// was called to create the transaction.
ops.commit(base, base.rollbackTo(apply()));
}
}
Loading