diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 2c42310f1a54..6d1a924d87cb 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -19,14 +19,18 @@ package org.apache.iceberg.spark; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -39,6 +43,7 @@ import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -46,7 +51,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; @@ -84,6 +91,9 @@ */ public class SparkCatalog extends BaseCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); + private static final Splitter COMMA = Splitter.on(","); + private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); + private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); private String catalogName = null; private Catalog icebergCatalog = null; @@ -122,8 +132,8 @@ protected TableIdentifier buildIdentifier(Identifier identifier) { @Override public SparkTable loadTable(Identifier ident) throws NoSuchTableException { try { - Table icebergTable = load(ident); - return new SparkTable(icebergTable, !cacheEnabled); + Pair icebergTable = load(ident); + return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new NoSuchTableException(ident); } @@ -224,7 +234,7 @@ public SparkTable alterTable(Identifier ident, TableChange... changes) throws No } try { - Table table = load(ident); + Table table = load(ident).first(); commitChanges(table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges); return new SparkTable(table, true /* refreshEagerly */); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -259,7 +269,7 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept @Override public void invalidateTable(Identifier ident) { try { - load(ident).refresh(); + load(ident).first().refresh(); } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { // ignore if the table doesn't exist, it is not cached } @@ -471,10 +481,97 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } - private Table load(Identifier ident) { - return isPathIdentifier(ident) ? - tables.load(((PathIdentifier) ident).location()) : - icebergCatalog.loadTable(buildIdentifier(ident)); + private Pair load(Identifier ident) { + if (isPathIdentifier(ident)) { + return loadFromPathIdentifier((PathIdentifier) ident); + } + + try { + return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); + + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + // if the original load didn't work, the identifier may be extended and include a snapshot selector + TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); + Table table; + try { + table = icebergCatalog.loadTable(namespaceAsIdent); + } catch (org.apache.iceberg.exceptions.NoSuchTableException ignored) { + // the namespace does not identify a table, so it cannot be a table with a snapshot selector + // throw the original exception + throw e; + } + + // loading the namespace as a table worked, check the name to see if it is a valid selector + Matcher at = AT_TIMESTAMP.matcher(ident.name()); + if (at.matches()) { + long asOfTimestamp = Long.parseLong(at.group(1)); + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } + + Matcher id = SNAPSHOT_ID.matcher(ident.name()); + if (id.matches()) { + long snapshotId = Long.parseLong(id.group(1)); + return Pair.of(table, snapshotId); + } + + // the name wasn't a valid snapshot selector. throw the original exception + throw e; + } + } + + private Pair> parseLocationString(String location) { + int hashIndex = location.lastIndexOf('#'); + if (hashIndex != -1 && !location.endsWith("#")) { + String baseLocation = location.substring(0, hashIndex); + List metadata = COMMA.splitToList(location.substring(hashIndex + 1)); + return Pair.of(baseLocation, metadata); + } else { + return Pair.of(location, ImmutableList.of()); + } + } + + private Pair loadFromPathIdentifier(PathIdentifier ident) { + Pair> parsed = parseLocationString(ident.location()); + + String metadataTableName = null; + Long asOfTimestamp = null; + Long snapshotId = null; + for (String meta : parsed.second()) { + if (MetadataTableType.from(meta) != null) { + metadataTableName = meta; + continue; + } + + Matcher at = AT_TIMESTAMP.matcher(meta); + if (at.matches()) { + asOfTimestamp = Long.parseLong(at.group(1)); + continue; + } + + Matcher id = SNAPSHOT_ID.matcher(meta); + if (id.matches()) { + snapshotId = Long.parseLong(id.group(1)); + } + } + + Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, + "Cannot specify both snapshot-id and as-of-timestamp: %s", ident.location()); + + Table table = tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : "")); + + if (snapshotId != null) { + return Pair.of(table, snapshotId); + } else if (asOfTimestamp != null) { + return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp)); + } else { + return Pair.of(table, null); + } + } + + private Identifier namespaceToIdentifier(String[] namespace) { + String[] ns = Arrays.copyOf(namespace, namespace.length - 1); + String name = namespace[ns.length]; + return Identifier.of(ns, name); } private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) { diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index 98fcc0e85cf4..7aa66b2223fc 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -19,11 +19,13 @@ package org.apache.iceberg.spark.source; +import java.util.Arrays; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.PathIdentifier; import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -56,6 +58,8 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions { private static final String DEFAULT_CATALOG_NAME = "default_iceberg"; private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME; + private static final String AT_TIMESTAMP = "at_timestamp_"; + private static final String SNAPSHOT_ID = "snapshot_id_"; @Override public String shortName() { @@ -101,24 +105,52 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri SparkSession spark = SparkSession.active(); setupDefaultSparkCatalog(spark); String path = options.get("path"); + + Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID); + Long asOfTimestamp = propertyAsLong(options, SparkReadOptions.AS_OF_TIMESTAMP); + Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null, + "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", snapshotId, asOfTimestamp); + + String selector = null; + + if (snapshotId != null) { + selector = SNAPSHOT_ID + snapshotId; + } + + if (asOfTimestamp != null) { + selector = AT_TIMESTAMP + asOfTimestamp; + } + CatalogManager catalogManager = spark.sessionState().catalogManager(); if (path.contains("/")) { // contains a path. Return iceberg default catalog and a PathIdentifier + String newPath = (selector == null) ? path : path + "#" + selector; return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - new PathIdentifier(path)); + new PathIdentifier(newPath)); } final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = Spark3Util.catalogAndIdentifier( "path or identifier", spark, path); + Identifier ident = identifierWithSelector(catalogAndIdentifier.identifier(), selector); if (catalogAndIdentifier.catalog().name().equals("spark_catalog") && !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) { // catalog is a session catalog but does not support Iceberg. Use Iceberg instead. - return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), - catalogAndIdentifier.identifier()); + return new Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME), ident); } else { - return catalogAndIdentifier; + return new Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), ident); + } + } + + private Identifier identifierWithSelector(Identifier ident, String selector) { + if (selector == null) { + return ident; + } else { + String[] namespace = ident.namespace(); + String[] ns = Arrays.copyOf(namespace, namespace.length + 1); + ns[namespace.length] = ident.name(); + return Identifier.of(ns, selector); } } @@ -132,6 +164,15 @@ public String extractCatalog(CaseInsensitiveStringMap options) { return catalogAndIdentifier(options).catalog().name(); } + private static Long propertyAsLong(CaseInsensitiveStringMap options, String property) { + String value = options.get(property); + if (value != null) { + return Long.parseLong(value); + } + + return null; + } + private static void setupDefaultSparkCatalog(SparkSession spark) { if (spark.conf().contains(DEFAULT_CATALOG)) { return; diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 82fbdea8d6ea..07c85fb8de44 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -62,24 +62,17 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S private Filter[] pushedFilters = NO_FILTERS; private boolean ignoreResiduals = false; - SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + SparkScanBuilder(SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; + this.schema = schema; this.readConf = new SparkReadConf(spark, table, options); this.options = options; this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); } - private Schema lazySchema() { - if (schema == null) { - if (requestedProjection != null) { - // the projection should include all columns that will be returned, including those only used in filters - this.schema = SparkSchemaUtil.prune(table.schema(), requestedProjection, filterExpression(), caseSensitive); - } else { - this.schema = table.schema(); - } - } - return schema; + SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + this(spark, table, table.schema(), options); } private Expression filterExpression() { @@ -108,7 +101,7 @@ public Filter[] pushFilters(Filter[] filters) { Expression expr = SparkFilters.convert(filter); if (expr != null) { try { - Binder.bind(table.schema().asStruct(), expr, caseSensitive); + Binder.bind(schema.asStruct(), expr, caseSensitive); expressions.add(expr); pushed.add(filter); } catch (ValidationException e) { @@ -136,6 +129,9 @@ public void pruneColumns(StructType requestedSchema) { .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) .toArray(StructField[]::new)); + // the projection should include all columns that will be returned, including those only used in filters + this.schema = SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), caseSensitive); + Stream.of(requestedSchema.fields()) .map(StructField::name) .filter(MetadataColumns::isMetadataColumn) @@ -157,7 +153,7 @@ private Schema schemaWithMetadataColumns() { Schema meta = new Schema(fields); // schema or rows returned by readers - return TypeUtil.join(lazySchema(), meta); + return TypeUtil.join(schema, meta); } @Override diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index c535a3534954..4031b89218d0 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -27,8 +27,10 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFilters; @@ -36,6 +38,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.SupportsRead; import org.apache.spark.sql.connector.catalog.SupportsWrite; @@ -76,7 +79,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, TableCapability.OVERWRITE_DYNAMIC); private final Table icebergTable; - private final StructType requestedSchema; + private final Long snapshotId; private final boolean refreshEagerly; private StructType lazyTableSchema = null; private SparkSession lazySpark = null; @@ -85,15 +88,10 @@ public SparkTable(Table icebergTable, boolean refreshEagerly) { this(icebergTable, null, refreshEagerly); } - public SparkTable(Table icebergTable, StructType requestedSchema, boolean refreshEagerly) { + public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) { this.icebergTable = icebergTable; - this.requestedSchema = requestedSchema; + this.snapshotId = snapshotId; this.refreshEagerly = refreshEagerly; - - if (requestedSchema != null) { - // convert the requested schema to throw an exception if any requested fields are unknown - SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema); - } } private SparkSession sparkSession() { @@ -113,14 +111,14 @@ public String name() { return icebergTable.toString(); } + private Schema snapshotSchema() { + return SnapshotUtil.schemaFor(icebergTable, snapshotId, null); + } + @Override public StructType schema() { if (lazyTableSchema == null) { - if (requestedSchema != null) { - this.lazyTableSchema = SparkSchemaUtil.convert(SparkSchemaUtil.prune(icebergTable.schema(), requestedSchema)); - } else { - this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema()); - } + this.lazyTableSchema = SparkSchemaUtil.convert(snapshotSchema()); } return lazyTableSchema; @@ -171,17 +169,16 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { icebergTable.refresh(); } - SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options); - - if (requestedSchema != null) { - scanBuilder.pruneColumns(requestedSchema); - } - - return scanBuilder; + CaseInsensitiveStringMap scanOptions = addSnapshotId(options, snapshotId); + return new SparkScanBuilder(sparkSession(), icebergTable, snapshotSchema(), scanOptions); } @Override public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot write to table at a specific snapshot: %s", snapshotId); + if (info.options().containsKey(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID)) { // replace data files in the given file scan task set with new files return new SparkRewriteBuilder(sparkSession(), icebergTable, info); @@ -212,6 +209,10 @@ private String getRowLevelOperationMode(String operation) { @Override public boolean canDeleteWhere(Filter[] filters) { + Preconditions.checkArgument( + snapshotId == null, + "Cannot delete from table at a specific snapshot: %s", snapshotId); + if (table().specs().size() > 1) { // cannot guarantee a metadata delete will be successful if we have multiple specs return false; @@ -283,4 +284,22 @@ public int hashCode() { // use only name in order to correctly invalidate Spark cache return icebergTable.name().hashCode(); } + + private static CaseInsensitiveStringMap addSnapshotId(CaseInsensitiveStringMap options, Long snapshotId) { + if (snapshotId != null) { + String snapshotIdFromOptions = options.get(SparkReadOptions.SNAPSHOT_ID); + String value = snapshotId.toString(); + Preconditions.checkArgument(snapshotIdFromOptions == null || snapshotIdFromOptions.equals(value), + "Cannot override snapshot ID more than once: %s", snapshotIdFromOptions); + + Map scanOptions = Maps.newHashMap(); + scanOptions.putAll(options.asCaseSensitiveMap()); + scanOptions.put(SparkReadOptions.SNAPSHOT_ID, value); + scanOptions.remove(SparkReadOptions.AS_OF_TIMESTAMP); + + return new CaseInsensitiveStringMap(scanOptions); + } + + return options; + } } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c1c6a26435e4..78137d139dbf 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -55,7 +55,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -71,6 +73,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private static final Schema SCHEMA2 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "category", Types.StringType.get()) + ); + + private static final Schema SCHEMA3 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(3, "category", Types.StringType.get()) + ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); @Rule @@ -1142,6 +1155,223 @@ public void testPartitionsTable() { } } + @Test + public synchronized void testSnapshotReadAfterAddColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshotBeforeAddColumn = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshotBeforeAddColumn.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x", "A"), + RowFactory.create(2, "y", "A"), + RowFactory.create(3, "z", "B")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis()); + table.updateSchema().deleteColumn("data").commit(); + long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis()); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "A"), + RowFactory.create(2, "A"), + RowFactory.create(3, "B"), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + + // At tsAfterDropColumn, there has been a schema change, but no new snapshot, + // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn. + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterAddAndDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshotBeforeAddColumn = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + table.updateSchema().deleteColumn("data").commit(); + + List recordsAfterDropColumn = Lists.newArrayList( + RowFactory.create(1, null), + RowFactory.create(2, null), + RowFactory.create(3, null), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", recordsAfterDropColumn, + resultDf3.orderBy("id").collectAsList()); + + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshotBeforeAddColumn.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + @Test public void testRemoveOrphanFilesActionSupport() throws InterruptedException { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index 6016dd2212ad..cce987a781c0 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; public class TestDeleteFrom extends SparkCatalogTestBase { @@ -71,6 +72,28 @@ public void testDeleteFromUnpartitionedTable() throws NoSuchTableException { 0L, scalarSql("SELECT count(1) FROM %s", tableName)); } + public void testDeleteFromTableAtSnapshot() throws NoSuchTableException { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + String prefix = "snapshot_id_"; + AssertHelpers.assertThrows("Should not be able to delete from a table at a specific snapshot", + IllegalArgumentException.class, "Cannot delete from table at a specific snapshot", + () -> sql("DELETE FROM %s.%s WHERE id < 4", tableName, prefix + snapshotId)); + } + @Test public void testDeleteFromPartitionedTable() throws NoSuchTableException { sql("CREATE TABLE %s (id bigint, data string) " + diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 846e234cba07..38dfe0e5afda 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -21,12 +21,16 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.events.ScanEvent; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -120,4 +124,83 @@ public void testMetadataTables() { ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)), sql("SELECT * FROM %s.snapshots", tableName)); } + + @Test + public void testSnapshotInTableName() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + // get the snapshot ID of the last write and get the current row set as expected + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + String prefix = "snapshot_id_"; + // read the table at the snapshot + List actual = sql("SELECT * FROM %s.%s", tableName, prefix + snapshotId); + assertEquals("Snapshot at specific ID, prefix " + prefix, expected, actual); + + // read the table using DataFrameReader option + Dataset df = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshotId) + .load(tableName); + List fromDF = rowsToJava(df.collectAsList()); + assertEquals("Snapshot at specific ID " + snapshotId, expected, fromDF); + } + + @Test + public void testTimestampInTableName() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + // get a timestamp just after the last write and get the current row set as expected + long snapshotTs = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis(); + long timestamp = waitUntilAfter(snapshotTs + 2); + List expected = sql("SELECT * FROM %s", tableName); + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + String prefix = "at_timestamp_"; + // read the table at the snapshot + List actual = sql("SELECT * FROM %s.%s", tableName, prefix + timestamp); + assertEquals("Snapshot at timestamp, prefix " + prefix, expected, actual); + + // read the table using DataFrameReader option + Dataset df = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .load(tableName); + List fromDF = rowsToJava(df.collectAsList()); + assertEquals("Snapshot at timestamp " + timestamp, expected, fromDF); + } + + @Test + public void testSpecifySnapshotAndTimestamp() { + // get the snapshot ID of the last write + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + // get a timestamp just after the last write + long timestamp = validationCatalog.loadTable(tableIdent).currentSnapshot().timestampMillis() + 2; + + // create a second snapshot + sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0)", tableName); + + AssertHelpers.assertThrows("Should not be able to specify both snapshot id and timestamp", + IllegalArgumentException.class, + String.format("Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", + snapshotId, timestamp), + () -> { + spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshotId) + .option(SparkReadOptions.AS_OF_TIMESTAMP, timestamp) + .load(tableName) + .collectAsList(); + }); + } } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java index 988df10e0e18..7a6ea0996e22 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.source.SimpleRecord; @@ -30,6 +31,7 @@ import org.apache.spark.sql.functions; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -84,6 +86,32 @@ public void testInsertOverwrite() { assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); } + @Test + public void testInsertAppendAtSnapshot() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + String prefix = "snapshot_id_"; + AssertHelpers.assertThrows("Should not be able to insert into a table at a specific snapshot", + IllegalArgumentException.class, "Cannot write to table at a specific snapshot", + () -> sql("INSERT INTO %s.%s VALUES (4, 'd'), (5, 'e')", tableName, prefix + snapshotId)); + } + + @Test + public void testInsertOverwriteAtSnapshot() { + Assume.assumeFalse( + "Spark session catalog does not support extended table names", + "spark_catalog".equals(catalogName)); + + long snapshotId = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); + String prefix = "snapshot_id_"; + AssertHelpers.assertThrows("Should not be able to insert into a table at a specific snapshot", + IllegalArgumentException.class, "Cannot write to table at a specific snapshot", + () -> sql("INSERT OVERWRITE %s.%s VALUES (4, 'd'), (5, 'e')", tableName, prefix + snapshotId)); + } + @Test public void testDataFrameV2Append() throws NoSuchTableException { Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName));