Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

package org.apache.iceberg.util;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -30,9 +35,34 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class SnapshotUtil {
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

private SnapshotUtil() {
}

/**
* Returns the ID of the most recent snapshot for the table as of the timestamp.
*
* @param table a {@link Table}
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the snapshot ID
* @throws IllegalArgumentException when no snapshot is found in the table
* older than the timestamp
*/
public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
Long snapshotId = null;
for (HistoryEntry logEntry : table.history()) {
if (logEntry.timestampMillis() <= timestampMillis) {
snapshotId = logEntry.snapshotId();
}
}

Preconditions.checkArgument(snapshotId != null,
"Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));

return snapshotId;
}

/**
* Returns whether ancestorSnapshotId is an ancestor of snapshotId.
*/
Expand Down Expand Up @@ -144,4 +174,8 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) {
throw new IllegalStateException(
String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId));
}

private static String formatTimestampMillis(long millis) {
return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
}
}
113 changes: 105 additions & 8 deletions spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.apache.iceberg.spark;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
Expand All @@ -39,13 +43,16 @@
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -80,6 +87,9 @@
*/
public class SparkCatalog extends BaseCatalog {
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
private static final Pattern AT_TIME = Pattern.compile("at(?:_(?:time(?:stamp)?)?)?_?(\\d+)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason for supporting so many different ways to specify the time and snapshot id? My opinion is we should just support 1 shortcut and 1 full name for each, for example at_ and at_timestamp_ for timestamp travel, s_ and snapshot_id_ for snapshot ID travel. Maybe even the full name ones are not necessary.

Copy link
Contributor Author

@rdblue rdblue Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have gone a bit too far here. I think that we want to be able to use the full "at_timestamp_12938471" version because that is the least likely to conflict with existing table names. Similarly, I think it is valuable to have a short version, like at_<timestamp> and snap_<id> so you don't have to type at_timestamp_ or the full snapshot_id_ every time. Since we were already testing multiple prefixes, I added a few that I thought would be valuable to avoid confusion:

  • Make the final _ optional because that's an easy mistake to make
  • Allow time and not just timestamp because people may not remember
  • Allow omitting _id and allow just snap_ or s_ as shortened forms

The logic made sense at every step but there are quite a few variations. I'm up for defining the full version and one shortening if you think it's best to have just a couple.

private static final Pattern SNAPSHOT_ID = Pattern.compile("s(?:nap(?:shot)?)?(?:_id)?_?(\\d+)");

private String catalogName = null;
private Catalog icebergCatalog = null;
Expand Down Expand Up @@ -118,8 +128,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) {
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
Table icebergTable = load(ident);
return new SparkTable(icebergTable, !cacheEnabled);
Pair<Table, Long> icebergTable = load(ident);
return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
Expand Down Expand Up @@ -220,7 +230,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No
}

try {
Table table = load(ident);
Table table = load(ident).first();
commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
Expand Down Expand Up @@ -256,7 +266,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept
@Override
public void invalidateTable(Identifier ident) {
try {
load(ident).refresh();
load(ident).first().refresh();
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// ignore if the table doesn't exist, it is not cached
}
Expand Down Expand Up @@ -456,10 +466,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method)
}
}

private Table load(Identifier ident) {
return isPathIdentifier(ident) ?
tables.load(((PathIdentifier) ident).location()) :
icebergCatalog.loadTable(buildIdentifier(ident));
private Pair<Table, Long> load(Identifier ident) {
if (isPathIdentifier(ident)) {
return loadFromPathIdentifier((PathIdentifier) ident);
}

try {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are also using . as the deliminator for table names, similar to how system tables are parsed today. However, system table parsing logic is in core, should we also move this logic to core so all engines obey the same table name format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated that as well. I'm not sure where it would be most clean to put this, which is why I added it just to Spark for now. I think we will also want to extend this to work for branch or tag names, at which point we may want to reconsider moving everything into core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot one other thing. This is a more reasonable change if we do it in Spark because SparkTable is a pretty thin wrapper around Iceberg's Table. If we do this in core, then we would have to update BaseTable to select a version -- which is pretty strange with the API -- or to introduce a wrapper table type that only allows reading. So since we already have the wrapper table it seemed easiest just to update that for now and not over-complicate the core Table API.

Copy link
Contributor

@nastra nastra Nov 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to support the timestamp as part of the TableIdentifier as that would align better with what we're planning to do anyway for Snapshot Branching/Tagging, where the TableIdentifier needs to understand what reference we're currently at. Similarly, it would also understand the timestamp.
I'll update https://docs.google.com/document/d/1KSgkVYnIMlWEbAT1qSnnnLS-gc0kdgHlWR6Ud08HOhA/edit#heading=h.zf5ulr1b1ytv to also respect timestamps

Table table;
try {
table = icebergCatalog.loadTable(namespaceAsIdent);
} catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) {
// the namespace does not identify a table, so it cannot be a table with a snapshot selector
// throw the original exception
throw e;
}

// loading the namespace as a table worked, check the name to see if it is a valid selector
Matcher at = AT_TIME.matcher(ident.name());
if (at.matches()) {
long asOfTimestamp = Long.parseLong(at.group(1));
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
}

Matcher id = SNAPSHOT_ID.matcher(ident.name());
if (id.matches()) {
long snapshotId = Long.parseLong(id.group(1));
return Pair.of(table, snapshotId);
}

// the name wasn't a valid snapshot selector. throw the original exception
throw e;
}
}

private Pair<String, List<String>> parseLocationString(String location) {
int hashIndex = location.lastIndexOf('#');
if (hashIndex != -1 && !location.endsWith("#")) {
String baseLocation = location.substring(0, hashIndex);
List<String> metadata = COMMA.splitToList(location.substring(hashIndex + 1));
return Pair.of(baseLocation, metadata);
} else {
return Pair.of(location, ImmutableList.of());
}
}

private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
Pair<String, List<String>> parsed = parseLocationString(ident.location());

String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
for (String meta : parsed.second()) {
if (MetadataTableType.from(meta) != null) {
metadataTableName = meta;
continue;
}

Matcher at = AT_TIME.matcher(meta);
if (at.matches()) {
asOfTimestamp = Long.parseLong(at.group(1));
continue;
}

Matcher id = SNAPSHOT_ID.matcher(meta);
if (id.matches()) {
snapshotId = Long.parseLong(id.group(1));
}
}

Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
"Cannot specify as-of-timestamp and snapshot-id: %s", ident.location());

Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));

if (snapshotId != null) {
return Pair.of(table, snapshotId);
} else if (asOfTimestamp != null) {
return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
} else {
return Pair.of(table, null);
}
}

private Identifier namespaceToIdentifier(String[] namespace) {
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
}

private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Map;
import java.util.Set;
import org.apache.arrow.util.Preconditions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use this instead of com.google.common.base.Preconditions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. And this shouldn't be allowed. We have a rule that checks for the wrong one in imports. Looks like we now need to add the Arrow package.

import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand All @@ -29,6 +30,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
Expand Down Expand Up @@ -76,7 +78,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
TableCapability.OVERWRITE_DYNAMIC);

private final Table icebergTable;
private final StructType requestedSchema;
private final Long snapshotId;
private final boolean refreshEagerly;
private StructType lazyTableSchema = null;
private SparkSession lazySpark = null;
Expand All @@ -85,15 +87,10 @@ public SparkTable(Table icebergTable, boolean refreshEagerly) {
this(icebergTable, null, refreshEagerly);
}

public SparkTable(Table icebergTable, StructType requestedSchema, boolean refreshEagerly) {
public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) {
this.icebergTable = icebergTable;
this.requestedSchema = requestedSchema;
this.snapshotId = snapshotId;
this.refreshEagerly = refreshEagerly;

if (requestedSchema != null) {
// convert the requested schema to throw an exception if any requested fields are unknown
SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema);
}
}

private SparkSession sparkSession() {
Expand All @@ -116,11 +113,8 @@ public String name() {
@Override
public StructType schema() {
if (lazyTableSchema == null) {
if (requestedSchema != null) {
this.lazyTableSchema = SparkSchemaUtil.convert(SparkSchemaUtil.prune(icebergTable.schema(), requestedSchema));
} else {
this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema());
}
// TODO: return the schema of the snapshot if it is set
this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema());
}

return lazyTableSchema;
Expand Down Expand Up @@ -171,17 +165,15 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
icebergTable.refresh();
}

SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);

if (requestedSchema != null) {
scanBuilder.pruneColumns(requestedSchema);
}

return scanBuilder;
return new SparkScanBuilder(sparkSession(), icebergTable, addSnapshotId(options, snapshotId));
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
Preconditions.checkArgument(
snapshotId == null,
"Cannot write to table at a specific snapshot: %s", snapshotId);

if (info.options().containsKey(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID)) {
// replace data files in the given file scan task set with new files
return new SparkRewriteBuilder(sparkSession(), icebergTable, info);
Expand All @@ -192,6 +184,10 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {

@Override
public MergeBuilder newMergeBuilder(String operation, LogicalWriteInfo info) {
Preconditions.checkArgument(
snapshotId == null,
"Cannot write to table at a specific snapshot: %s", snapshotId);

String mode = getRowLevelOperationMode(operation);
ValidationException.check(mode.equals("copy-on-write"), "Unsupported mode for %s: %s", operation, mode);
return new SparkMergeBuilder(sparkSession(), icebergTable, operation, info);
Expand All @@ -212,6 +208,10 @@ private String getRowLevelOperationMode(String operation) {

@Override
public boolean canDeleteWhere(Filter[] filters) {
Preconditions.checkArgument(
snapshotId == null,
"Cannot delete from table at a specific snapshot: %s", snapshotId);

if (table().specs().size() > 1) {
// cannot guarantee a metadata delete will be successful if we have multiple specs
return false;
Expand Down Expand Up @@ -283,4 +283,19 @@ public int hashCode() {
// use only name in order to correctly invalidate Spark cache
return icebergTable.name().hashCode();
}

private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) {
if (snapshotId != null) {
Preconditions.checkArgument(options.get(SparkReadOptions.SNAPSHOT_ID) == null,
"Cannot override snapshot ID more than once: %s", options.get(SparkReadOptions.SNAPSHOT_ID));

Map<String, String> scanOptions = Maps.newHashMap();
scanOptions.putAll(options.asCaseSensitiveMap());
scanOptions.put(SparkReadOptions.SNAPSHOT_ID, String.valueOf(snapshotId));

return new CaseInsensitiveStringMap(scanOptions);
}

return options;
}
}
Loading