diff --git a/docs/branching-and-tagging.md b/docs/branching-and-tagging.md new file mode 100644 index 000000000000..f2d8606edcb1 --- /dev/null +++ b/docs/branching-and-tagging.md @@ -0,0 +1,122 @@ +--- +title: "Branching and Tagging" +url: branching +aliases: + - "tables/branching" +menu: + main: + parent: Tables + weight: 0 +--- + + + +# Branching and Tagging + +## Overview + +Iceberg table metadata maintains a log of snapshots which represent the changes applied to a table. +Snapshots are fundamental in Iceberg as they are the basis for reader isolation and time travel queries. +For controlling metadata size and storage costs, Iceberg provides snapshot lifecycle management procedures such as [`expire_snapshots`](../../spark/spark-procedures/#expire-snapshots) for removing unused snapshots and no longer neccessary data files based on table snapshot retention properties. + +**For more sophisticated snapshot lifecycle management, Iceberg supports branches and tags which are named references to snapshots with their own independent lifecycles. This lifecycle is controlled by branch and tag level retention policies.** +Branches are independent lineages of snapshots and point to the head of the lineage. +Branches and tags have a maximum reference age property which control when the reference to the snapshot itself should be expired. +Branches have retention properties which define the minimum number of snapshots to retain on a branch as well as the maximum age of individual snapshots to retain on the branch. +These properties are used when the expireSnapshots procedure is run. +For details on the algorithm for expireSnapshots, refer to the [spec](../../../spec#snapshot-retention-policy). + +## Use Cases + +Branching and tagging can be used for handling GDPR requirements and retaining important historical snapshots for auditing. +Branches can also be used as part of data engineering workflows, for enabling experimental branches for testing and validating new jobs. +See below for some examples of how branching and tagging can facilitate these use cases. + +### Historical Tags + +Tags can be used for retaining important historical snapshots for auditing purposes. + +![Historical Tags](../img/historical-snapshot-tag.png) + +The above diagram demonstrates retaininig important historical snapshot with the following retention policy, defined +via Spark SQL. + +1. Retain 1 snapshot per week for 1 month. This can be achieved by tagging the weekly snapshot and setting the tag retention to be a month. +snapshots will be kept, and the branch reference itself will be retained for 1 week. +```sql +-- Create a tag for the first end of week snapshot. Retain the snapshot for a week +ALTER TABLE prod.db.table CREATE TAG 'EOW-01' AS OF VERSION 7 RETAIN 7 DAYS +``` + +2. Retain 1 snapshot per month for 6 months. This can be achieved by tagging the monthly snapshot and setting the tag retention to be 6 months. +```sql +-- Create a tag for the first end of month snapshot. Retain the snapshot for 6 months +ALTER TABLE prod.db.table CREATE TAG 'EOM-01' AS OF VERSION 30 RETAIN 180 DAYS +``` + +3. Retain 1 snapshot per year forever. This can be achieved by tagging the annual snapshot. The default retention for branches and tags is forever. +```sql +-- Create a tag for the end of the year and retain it forever. +ALTER TABLE prod.db.table CREATE TAG 'EOY-2023' AS OF VERSION 365 +``` + +4. Create a temporary "test-branch" which is retained for 7 days and the latest 2 snapshots on the branch are retained. +```sql +-- Create a branch "test-branch" which will be retained for 7 days along with the latest 2 snapshots +ALTER TABLE prod.db.table CREATE BRANCH test-branch RETAIN 7 DAYS WITH RETENTION 2 SNAPSHOTS +``` + +### Audit Branch + +![Audit Branch](../img/audit-branch.png) + +The above diagram shows an example of using an audit branch for validating a write workflow. + +1. First ensure `write.wap.enabled` is set. +```sql +ALTER TABLE db.table SET TBLPROPERTIES ( + 'write.wap.enabled''true' +) +``` +2. Create `audit-branch` starting from snapshot 3, which will be written to and retained for 1 week. +```sql +ALTER TABLE db.table CREATE BRANCH `audit-branch` AS OF VERSION 3 RETAIN 7 DAYS +``` +3. Writes are performed on a separate `audit-branch` independent from the main table history. +```sql +-- WAP Branch write +SET spark.wap.branch = 'audit-branch' +INSERT INTO prod.db.table VALUES (3, 'c') +``` +4. A validation workflow can validate (e.g. data quality) the state of `audit-branch`. +5. After validation, the main branch can be `fastForward` to the head of `audit-branch` to update the main table state. +```java +table.manageSnapshots().fastForward("main", "audit-branch").commit() +``` +6. The branch reference will be removed when `expireSnapshots` is run 1 week later. + +## Usage + +Creating, querying and writing to branches and tags are supported in the Iceberg Java library, and in Spark and Flink engine integrations. + +- [Iceberg Java Library](../../java-api-quickstart/#branching-and-tagging) +- [Spark DDLs](../spark-ddl/#branching-and-tagging-ddl) +- [Spark Reads](../spark-queries/#time-travel) +- [Spark Branch Writes](../spark-writes/#writing-to-branches) +- [Flink Reads](../flink-queries/#reading-branches-and-tags-with-SQL) +- [Flink Branch Writes](../flink-writes/#branch-writes) \ No newline at end of file diff --git a/docs/flink-configuration.md b/docs/flink-configuration.md index 5df2a440ada1..2085da4adddf 100644 --- a/docs/flink-configuration.md +++ b/docs/flink-configuration.md @@ -118,7 +118,11 @@ env.getConfig() | starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source. | | start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. | | start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. | -| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. +| branch | N/A | N/A | main | Specifies the branch to read from in batch mode +| tag | N/A | N/A | null | Specifies the tag to read from in batch mode +| start-tag | N/A | N/A | null | Specifies the starting tag to read from for incremental reads +| end-tag | N/A | N/A | null | Specifies the ending tag to to read from for incremental reads | | split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. | | split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. | | split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. | diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md index 677d628c2807..078bc12466c1 100644 --- a/docs/flink-getting-started.md +++ b/docs/flink-getting-started.md @@ -271,6 +271,16 @@ FlinkSink.forRowData(input) env.execute("Test Iceberg DataStream"); ``` +### Branch Writes +Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink` +For more information on branches please refer to [branches](../../tables/branching). +```java +FlinkSink.forRowData(input) + .tableLoader(tableLoader) + .toBranch("audit-branch") + .append(); +``` + ## Reading Submit a Flink __batch__ job using the following sentences: diff --git a/docs/flink-queries.md b/docs/flink-queries.md index 9afabe28aa61..2d62d18e61d7 100644 --- a/docs/flink-queries.md +++ b/docs/flink-queries.md @@ -80,6 +80,21 @@ Here are the SQL settings for the [FLIP-27](https://cwiki.apache.org/confluence/ SET table.exec.iceberg.use-flip27-source = true; ``` +### Reading branches and tags with SQL +Branch and tags can be read via SQL by specifying options. For more details +refer to [Flink Configuration](../flink-configuration/#read-options) + +```sql +--- Read from branch b1 +SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ; + +--- Read from tag t1 +SELECT * FROM table /*+ OPTIONS('tag'='t1') */; + +--- Incremental scan from tag t1 to tag t2 +SELECT * FROM table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='t1', 'end-tag'='t2') */; +``` + ## Reading with DataStream Iceberg support streaming or batch read in Java API now. @@ -197,6 +212,37 @@ env.execute("Test Iceberg Streaming Read"); There are other options that could be set by Java API, please see the [IcebergSource#Builder](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/flink/source/IcebergSource.html). +### Reading branches and tags with DataStream +Branches and tags can also be read via the DataStream API + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); +TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); +// Read from branch +DataStream batch = FlinkSource.forRowData() + .env(env) + .tableLoader(tableLoader) + .branch("test-branch") + .streaming(false) + .build(); + +// Read from tag +DataStream batch = FlinkSource.forRowData() + .env(env) + .tableLoader(tableLoader) + .tag("test-tag") + .streaming(false) + .build(); + +// Streaming read from start-tag +DataStream batch = FlinkSource.forRowData() + .env(env) + .tableLoader(tableLoader) + .streaming(true) + .startTag("test-tag") + .build(); +``` + ### Read as Avro GenericRecord FLIP-27 Iceberg source provides `AvroGenericRecordReaderFunction` that converts diff --git a/docs/flink-writes.md b/docs/flink-writes.md index 22cf0778cece..f61416bfca05 100644 --- a/docs/flink-writes.md +++ b/docs/flink-writes.md @@ -190,6 +190,16 @@ FlinkSink.builderFor( .append(); ``` +### Branch Writes +Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink` +For more information on branches please refer to [branches](../../tables/branching). +```java +FlinkSink.forRowData(input) + .tableLoader(tableLoader) + .toBranch("audit-branch") + .append(); +``` + ### Metrics The following Flink metrics are provided by the Flink Iceberg sink. diff --git a/docs/java-api-quickstart.md b/docs/java-api-quickstart.md index 80769be3afc5..24b61b42547f 100644 --- a/docs/java-api-quickstart.md +++ b/docs/java-api-quickstart.md @@ -198,3 +198,127 @@ PartitionSpec spec = PartitionSpec.builderFor(schema) ``` For more information on the different partition transforms that Iceberg offers, visit [this page](../../../spec#partitioning). + +## Branching and Tagging + +### Creating branches and tags + +New branches and tags can be created via the Java library's ManageSnapshots API. + +```java + +/* Create a branch test-branch which is retained for 1 week, and the latest 2 snapshots on test-branch will always be retained. +Snapshots on test-branch which are created within the last hour will also be retained. */ + +String branch = "test-branch"; +table.manageSnapshots() + .createBranch(branch, 3) + .setMinSnapshotsToKeep(branch, 2) + .setMaxSnapshotAgeMs(branch, 3600000) + .setMaxRefAgeMs(branch, 604800000) + .commit(); + +// Create a tag historical-tag at snapshot 10 which is retained for a day +String tag = "historical-tag" +table.manageSnapshots() + .createTag(tag, 10) + .setMaxRefAgeMs(tag, 86400000) + .commit(); +``` + +### Committing to branches + +Writing to a branch can be performed by specifying `toBranch` in the operation. For the full list refer to [UpdateOperations](../../java/api/#update-operations). +```java +// Append FILE_A to branch test-branch +String branch = "test-branch"; + +table.newAppend() + .appendFile(FILE_A) + .toBranch(branch) + .commit(); + + +// Perform row level updates on "test-branch" +table.newRowDelta() + .addRows(DATA_FILE) + .addDeletes(DELETES) + .toBranch(branch) + .commit(); + + +// Perform a rewrite operation replacing small_file_1 and small_file_2 on "test-branch" with compacted_file. +table.newRewrite() + .rewriteFiles(ImmutableSet.of(small_file_1,small_file_2), ImmutableSet.of(compacted_file)) + .toBranch(branch) + .commit(); + +``` + +### Reading from branches and tags +Reading from a branch or tag can be done as usual via the Table Scan API, by passing in a branch or tag in the `useRef` API. When a branch is passed in, the snapshot that's used is the head of the branch. Note that currently reading from a branch and specifying an `asOfSnapshotId` in the scan is not supported. + +```java +// Read from the head snapshot of test-branch +TableScan branchRead = table.newScan().useRef("test-branch"); + +// Read from the snapshot referenced by audit-tag +Table tagRead = table.newScan().useRef("audit-tag"); +``` + +### Replacing and fast forwarding branches and tags + +The snapshots which existing branches and tags point to can be updated via the `replace` APIs. The fast forward operation is similar to git fast-forwarding. Fast forward can be used to advance a target branch to the head of a source branch or a tag when the target branch is an ancestor of the source. For both fast forward and replace, retention properties of the target branch are maintained by default. + +```java + +// Update "test-branch" to point to snapshot 4 +table.manageSnapshots() + .replaceBranch(branch, 4) + .commit() + +String tag = "audit-tag"; +// Replace "audit-tag" to point to snapshot 3 and update its retention +table.manageSnapshots() + .replaceBranch(tag, 4) + .setMaxRefAgeMs(1000) + .commit() + + +``` + +### Updating retention properties + +Retention properties for branches and tags can be updated as well. +Use the setMaxRefAgeMs for updating the retention property of the branch or tag itself. Branch snapshot retention properties can be updated via the `setMinSnapshotsToKeep` and `setMaxSnapshotAgeMs` APIs. + +```java +String branch = "test-branch"; +// Update retention properties for test-branch +table.manageSnapshots() + .setMinSnapshotsToKeep(branch, 10) + .setMaxSnapshotAgeMs(branch, 7200000) + .setMaxRefAgeMs(branch, 604800000) + .commit(); + +// Update retention properties for test-tag +table.manageSnapshots() + .setMaxRefAgeMs("test-tag", 604800000) + .commit(); +``` + +### Removing branches and tags + +Branches and tags can be removed via the `removeBranch` and `removeTag` APIs respectively + +```java +// Remove test-branch +table.manageSnapshots() + .removeBranch("test-branch") + .commit() + +// Remove test-tag +table.manageSnapshots() + .removeTag("test-tag") + .commit() +``` diff --git a/docs/spark-ddl.md b/docs/spark-ddl.md index 2f979b5443d6..9eaae5345e83 100644 --- a/docs/spark-ddl.md +++ b/docs/spark-ddl.md @@ -471,4 +471,64 @@ ALTER TABLE prod.db.sample DROP IDENTIFIER FIELDS id, data -- multiple columns ``` -Note that although the identifier is removed, the column will still exist in the table schema. \ No newline at end of file +Note that although the identifier is removed, the column will still exist in the table schema. + +### Branching and Tagging DDL + +#### `ALTER TABLE ... CREATE BRANCH` + +Branches can be created via the `CREATE BRANCH` statement, which includes +the snapshot to create the branch at and an optional retention clause. + +```sql +-- CREATE audit-branch at snapshot 1234 with default retention. +ALTER TABLE prod.db.sample CREATE BRANCH audit-branch +AS OF VERSION 1234 + +-- CREATE audit-branch at snapshot 1234, retain audit-branch for 31 days, and retain the latest 31 days. The latest 3 snapshot snapshots, and 2 days worth of snapshots +ALTER TABLE prod.db.sample CREATE BRANCH audit-branch +AS OF VERSION 1234 RETAIN 30 DAYS +WITH RETENTION 3 SNAPSHOTS 2 DAYS +``` + + +#### `ALTER TABLE ... CREATE TAG` + +Tags can be created via the `CREATE TAG` statement, which includes +the snapshot to create the branch at and an optional retention clause. + +```sql +-- CREATE historical-tag at snapshot 1234 with default retention. +ALTER TABLE prod.db.sample CREATE TAG historical-tag AS OF VERSION 1234 + +-- CREATE historical-tag at snapshot 1234 and retain it for 1 year. +ALTER TABLE prod.db.sample CREATE TAG historical-tag +AS OF VERSION 1234 RETAIN 365 DAYS +``` + +### `ALTER TABLE ... REPLACE BRANCH` + +The snapshot which a branch references can be updated via +the `REPLACE BRANCH` sql. Retention can also be updated in this statement. + +```sql +-- REPLACE audit-branch to reference snapshot 4567 and update the retention to 60 days +ALTER TABLE prod.db.sample REPLACE BRANCH audit-branch +AS OF VERSION 4567 RETAIN 60 DAYS +``` + +#### `ALTER TABLE ... DROP BRANCH` + +Branches can be removed via the `DROP BRANCH` sql + +```sql +ALTER TABLE prod.db.sample DROP BRANCH audit-branch +``` + +#### `ALTER TABLE ... DROP TAG` + +Tags can be removed via the `DROP TAG` sql + +```sql +ALTER TABLE prod.db.sample DROP TAG historical-tag +``` \ No newline at end of file diff --git a/docs/spark-writes.md b/docs/spark-writes.md index 08f788fe26bb..484aee8fe1b4 100644 --- a/docs/spark-writes.md +++ b/docs/spark-writes.md @@ -203,6 +203,36 @@ WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid) For more complex row-level updates based on incoming data, see the section on `MERGE INTO`. +## Writing to Branches +Branch writes can be performed via SQL by providing a branch identifier, `branch_yourBranch` in the operation. +Branch writes can also be performed as part of a write-audit-publish (WAP) workflow by specifying the `spark.wap.branch` config. +Note WAP branch and branch identifier cannot both be specified. +Also, the branch must exist before performing the write. +The operation does **not** create the branch if it does not exist. +For more information on branches please refer to [branches](../../tables/branching) + +```sql +-- INSERT (1,' a') (2, 'b') into the audit branch. +INSERT INTO prod.db.table.branch_audit VALUES (1, 'a'), (2, 'b'); + +-- MERGE INTO audit branch +MERGE INTO prod.db.table.branch_audit t +USING (SELECT ...) s +ON t.id = s.id +WHEN ... + +-- UPDATE audit branch +UPDATE prod.db.table.branch_audit AS t1 +SET val = 'c' + +-- DELETE FROM audit branch +DELETE FROM prod.dbl.table.branch_audit WHERE id = 2; + +-- WAP Branch write +SET spark.wap.branch = audit-branch +INSERT INTO prod.db.table VALUES (3, 'c'); +``` + ## Writing with DataFrames Spark 3 introduced the new `DataFrameWriterV2` API for writing to tables using data frames. The v2 API is recommended for several reasons: