Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -37,6 +39,7 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -105,6 +108,7 @@ public class SparkCatalog extends BaseCatalog {
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -654,6 +658,13 @@ private Table load(Identifier ident) {
return new SparkTable(table, snapshotId, !cacheEnabled);
}

Matcher branch = BRANCH.matcher(ident.name());
if (branch.matches()) {
Snapshot snapshot = table.snapshot(branch.group(1));
if (snapshot != null) {
return new SparkTable(table, snapshot.snapshotId(), !cacheEnabled);
}
}
// the name wasn't a valid snapshot selector and did not point to the changelog
// throw the original exception
throw e;
Expand All @@ -678,6 +689,7 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
String branch = null;
boolean isChangelog = false;

for (String meta : parsed.second()) {
Expand All @@ -701,12 +713,19 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}

Matcher ref = BRANCH.matcher(meta);
if (ref.matches()) {
branch = ref.group(1);
}
}

Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id and as-of-timestamp: %s",
ident.location());
Stream.of(snapshotId, asOfTimestamp, branch).filter(Objects::nonNull).count() <= 1,
"Can specify at most one of snapshot-id (%s), as-of-timestamp (%s), and snapshot-ref (%s)",
snapshotId,
asOfTimestamp,
branch);

Preconditions.checkArgument(
!isChangelog || (snapshotId == null && asOfTimestamp == null),
Expand All @@ -722,6 +741,9 @@ private Table loadFromPathIdentifier(PathIdentifier ident) {
long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled);

} else if (branch != null && table.snapshot(branch) != null) {
return new SparkTable(table, table.snapshot(branch).snapshotId(), !cacheEnabled);

} else {
return new SparkTable(table, snapshotId, !cacheEnabled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions
"spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME;
private static final String AT_TIMESTAMP = "at_timestamp_";
private static final String SNAPSHOT_ID = "snapshot_id_";
private static final String BRANCH = "branch_";
private static final String[] EMPTY_NAMESPACE = new String[0];

private static final SparkTableCache TABLE_CACHE = SparkTableCache.get();
Expand Down Expand Up @@ -124,6 +125,7 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri

Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP);
String branch = options.get(SparkReadOptions.BRANCH);
Preconditions.checkArgument(
asOfTimestamp == null || snapshotId == null,
"Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)",
Expand All @@ -140,6 +142,10 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri
selector = AT_TIMESTAMP + asOfTimestamp;
}

if (branch != null) {
selector = BRANCH + branch;
}

CatalogManager catalogManager = spark.sessionState().catalogManager();

if (TABLE_CACHE.contains(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private static CaseInsensitiveStringMap addSnapshotId(
scanOptions.putAll(options.asCaseSensitiveMap());
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value);
scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP);

scanOptions.remove(SparkReadOptions.BRANCH);
return new CaseInsensitiveStringMap(scanOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,42 @@ public void testSnapshotSelectionByTimestampAndBranchOrTagFails() throws IOExcep
.isInstanceOf(IllegalArgumentException.class)
.hasMessageStartingWith("Cannot override ref, already set snapshot id=");
}

@Test
public void testSnapshotSelectionByBranchWithSchemaChange() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = tables.create(SCHEMA, spec, tableLocation);

// produce the first snapshot
List<SimpleRecord> firstBatchRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords, SimpleRecord.class);
firstDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);

table.manageSnapshots().createBranch("branch", table.currentSnapshot().snapshotId()).commit();

Dataset<Row> currentSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> currentSnapshotRecords =
currentSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, currentSnapshotRecords);

table.updateSchema().deleteColumn("data").commit();

Dataset<Row> deleteSnapshotResult =
spark.read().format("iceberg").option("branch", "branch").load(tableLocation);
List<SimpleRecord> deletedSnapshotRecords =
deleteSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
List<SimpleRecord> expectedRecordsAfterDeletion = Lists.newArrayList();
expectedRecordsAfterDeletion.addAll(firstBatchRecords);
Assert.assertEquals(
"Current snapshot rows should match", expectedRecords, deletedSnapshotRecords);
}
}