diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index e8906819ab61..f942c5189289 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -19,10 +19,6 @@ package org.apache.iceberg; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -37,6 +33,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +45,6 @@ abstract class BaseTableScan implements TableScan { private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class); - private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); - private final TableOperations ops; private final Table table; private final Schema schema; @@ -132,19 +128,7 @@ public TableScan asOfTime(long timestampMillis) { Preconditions.checkArgument(context.snapshotId() == null, "Cannot override snapshot, already set to id=%s", context.snapshotId()); - Long lastSnapshotId = null; - for (HistoryEntry logEntry : ops.current().snapshotLog()) { - if (logEntry.timestampMillis() <= timestampMillis) { - lastSnapshotId = logEntry.snapshotId(); - } - } - - // the snapshot ID could be null if no entries were older than the requested time. in that case, - // there is no valid snapshot to read. - Preconditions.checkArgument(lastSnapshotId != null, - "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis)); - - return useSnapshot(lastSnapshotId); + return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis)); } @Override @@ -199,7 +183,7 @@ public CloseableIterable planFiles() { Snapshot snapshot = snapshot(); if (snapshot != null) { LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table, - snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()), + snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()), context.rowFilter()); Listeners.notifyAll( @@ -304,8 +288,4 @@ private Schema lazyColumnProjection() { return schema; } - - private static String formatTimestampMillis(long millis) { - return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC)); - } } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index fdcd5ed35645..58a7c9bbdbd6 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -23,6 +23,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.ThreadPools; public class DataTableScan extends BaseTableScan { @@ -62,6 +63,15 @@ public TableScan appendsAfter(long fromSnapshotId) { return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId()); } + @Override + public TableScan useSnapshot(long scanSnapshotId) { + // call method in superclass just for the side effect of argument validation; + // we do not use its return value + super.useSnapshot(scanSnapshotId); + Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId); + return newRefinedScan(tableOps(), table(), snapshotSchema, context().useSnapshotId(scanSnapshotId)); + } + @Override protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) { return new DataTableScan(ops, table, schema, context); diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index 2dcaa9fa9407..6b115b07b445 100644 --- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -25,12 +25,15 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; public class DateTimeUtil { private DateTimeUtil() { } + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); public static final long MICROS_PER_MILLIS = 1000L; @@ -81,4 +84,8 @@ public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) { public static long microsFromTimestamptz(OffsetDateTime dateTime) { return ChronoUnit.MICROS.between(EPOCH, dateTime); } + + public static String formatTimestampMillis(long millis) { + return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC)); + } } diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index beafcc29c90c..b2efe2948bfe 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.function.Function; import org.apache.iceberg.DataFile; +import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; @@ -144,4 +146,50 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) { throw new IllegalStateException( String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId)); } + + /** + * Returns the ID of the most recent snapshot for the table as of the timestamp. + * + * @param table a {@link Table} + * @param timestampMillis the timestamp in millis since the Unix epoch + * @return the snapshot ID + * @throws IllegalArgumentException when no snapshot is found in the table + * older than the timestamp + */ + public static long snapshotIdAsOfTime(Table table, long timestampMillis) { + Long snapshotId = null; + for (HistoryEntry logEntry : table.history()) { + if (logEntry.timestampMillis() <= timestampMillis) { + snapshotId = logEntry.snapshotId(); + } + } + + Preconditions.checkArgument(snapshotId != null, + "Cannot find a snapshot older than %s", DateTimeUtil.formatTimestampMillis(timestampMillis)); + return snapshotId; + } + + /** + * Returns the schema of the table for the specified snapshot. + * + * @param table a {@link Table} + * @param snapshotId the ID of the snapshot + * @return the schema + */ + public static Schema schemaFor(Table table, long snapshotId) { + Snapshot snapshot = table.snapshot(snapshotId); + Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s", snapshotId); + Integer schemaId = snapshot.schemaId(); + + // schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot + if (schemaId != null) { + Schema schema = table.schemas().get(schemaId); + Preconditions.checkState(schema != null, + "Cannot find schema with schema id %s", schemaId); + return schema; + } + + // TODO: recover the schema by reading previous metadata files + return table.schema(); + } } diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index f0bc4ee521d4..8976d666f168 100644 --- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -68,8 +68,8 @@ public DataSourceReader createReader(StructType readSchema, DataSourceOptions op Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options); if (readSchema != null) { - // convert() will fail if readSchema contains fields not in table.schema() - SparkSchemaUtil.convert(table.schema(), readSchema); + // convert() will fail if readSchema contains fields not in reader.snapshotSchema() + SparkSchemaUtil.convert(reader.snapshotSchema(), readSchema); reader.pruneColumns(readSchema); } diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index a8a2b2f0e51f..2ac570dc8530 100644 --- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -86,14 +86,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final JavaSparkContext sparkContext; private final Table table; private final SparkReadConf readConf; - private final Long snapshotId; - private final Long startSnapshotId; - private final Long endSnapshotId; - private final Long asOfTimestamp; - private final Long splitSize; - private final Integer splitLookback; - private final Long splitOpenFileCost; - private final boolean caseSensitive; + private final TableScan baseScan; private StructType requestedSchema = null; private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; @@ -111,31 +104,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = new SparkReadConf(spark, table, options.asMap()); - this.snapshotId = readConf.snapshotId(); - this.asOfTimestamp = readConf.asOfTimestamp(); - if (snapshotId != null && asOfTimestamp != null) { - throw new IllegalArgumentException( - "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); - } - - this.startSnapshotId = readConf.startSnapshotId(); - this.endSnapshotId = readConf.endSnapshotId(); - if (snapshotId != null || asOfTimestamp != null) { - if (startSnapshotId != null || endSnapshotId != null) { - throw new IllegalArgumentException( - "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " + - "as-of-timestamp is specified"); - } - } else { - if (startSnapshotId == null && endSnapshotId != null) { - throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); - } - } - // look for split behavior overrides in options - this.splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null); - this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null); - this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null); + this.baseScan = configureBaseScan(caseSensitive, options); + this.schema = baseScan.schema(); if (table.io() instanceof HadoopFileIO) { String fsscheme = "no_exist"; @@ -157,18 +128,84 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.localityPreferred = false; } - this.schema = table.schema(); - this.caseSensitive = caseSensitive; this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); } + private void validateOptions( + Long snapshotId, Long asOfTimestamp, Long startSnapshotId, Long endSnapshotId) { + if (snapshotId != null && asOfTimestamp != null) { + throw new IllegalArgumentException( + "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); + } + + if ((snapshotId != null || asOfTimestamp != null) && + (startSnapshotId != null || endSnapshotId != null)) { + throw new IllegalArgumentException( + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " + + "as-of-timestamp is specified"); + } + + if (startSnapshotId == null && endSnapshotId != null) { + throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); + } + } + + private TableScan configureBaseScan(boolean caseSensitive, DataSourceOptions options) { + Long snapshotId = readConf.snapshotId(); + Long asOfTimestamp = readConf.asOfTimestamp(); + Long startSnapshotId = readConf.startSnapshotId(); + Long endSnapshotId = readConf.endSnapshotId(); + validateOptions(snapshotId, asOfTimestamp, startSnapshotId, endSnapshotId); + + TableScan scan = table.newScan().caseSensitive(caseSensitive); + + if (snapshotId != null) { + scan = scan.useSnapshot(snapshotId); + } + + if (asOfTimestamp != null) { + scan = scan.asOfTime(asOfTimestamp); + } + + if (startSnapshotId != null) { + if (endSnapshotId != null) { + scan = scan.appendsBetween(startSnapshotId, endSnapshotId); + } else { + scan = scan.appendsAfter(startSnapshotId); + } + } + + // look for split behavior overrides in options + Long splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null); + if (splitSize != null) { + scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); + } + + Integer splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null); + if (splitLookback != null) { + scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); + } + + Long splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null); + if (splitOpenFileCost != null) { + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); + } + + return scan; + } + + protected Schema snapshotSchema() { + return baseScan.schema(); + } + private Schema lazySchema() { if (schema == null) { if (requestedSchema != null) { // the projection should include all columns that will be returned, including those only used in filters - this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive); + this.schema = SparkSchemaUtil.prune( + baseScan.schema(), requestedSchema, filterExpression(), baseScan.isCaseSensitive()); } else { - this.schema = table.schema(); + this.schema = baseScan.schema(); } } return schema; @@ -211,6 +248,7 @@ public List> planBatchInputPartitions() { Broadcast tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); List scanTasks = tasks(); + boolean caseSensitive = baseScan.isCaseSensitive(); InputPartition[] readTasks = new InputPartition[scanTasks.size()]; Tasks.range(readTasks.length) @@ -235,6 +273,7 @@ public List> planInputPartitions() { Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); List scanTasks = tasks(); + boolean caseSensitive = baseScan.isCaseSensitive(); InputPartition[] readTasks = new InputPartition[scanTasks.size()]; Tasks.range(readTasks.length) @@ -378,38 +417,7 @@ private static void mergeIcebergHadoopConfs( private List tasks() { if (tasks == null) { - TableScan scan = table - .newScan() - .caseSensitive(caseSensitive) - .project(lazySchema()); - - if (snapshotId != null) { - scan = scan.useSnapshot(snapshotId); - } - - if (asOfTimestamp != null) { - scan = scan.asOfTime(asOfTimestamp); - } - - if (startSnapshotId != null) { - if (endSnapshotId != null) { - scan = scan.appendsBetween(startSnapshotId, endSnapshotId); - } else { - scan = scan.appendsAfter(startSnapshotId); - } - } - - if (splitSize != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString()); - } - - if (splitLookback != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString()); - } - - if (splitOpenFileCost != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString()); - } + TableScan scan = baseScan.project(lazySchema()); if (filterExpressions != null) { for (Expression filter : filterExpressions) { @@ -430,8 +438,8 @@ private List tasks() { @Override public String toString() { return String.format( - "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)", - table, lazySchema().asStruct(), filterExpressions, caseSensitive, enableBatchRead()); + "IcebergScan(table=%s, type=%s, filters=%s, batchedReads=%s)", + table, lazySchema().asStruct(), filterExpressions, enableBatchRead()); } private static class ReadTask implements Serializable, InputPartition { diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index a51f9ee85e2f..76923d43a3bc 100644 --- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -44,11 +44,14 @@ public static void start() { @After public void dropTable() throws IOException { - Table table = catalog.loadTable(currentIdentifier); - Path tablePath = new Path(table.location()); - FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); - fs.delete(tablePath, true); - catalog.dropTable(currentIdentifier, false); + if (currentIdentifier != null) { + Table table = catalog.loadTable(currentIdentifier); + Path tablePath = new Path(table.location()); + FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); + fs.delete(tablePath, true); + catalog.dropTable(currentIdentifier, false); + currentIdentifier = null; + } } @Override diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 10b9d6f3030c..93a3bf13189f 100644 --- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; @@ -52,7 +53,9 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.StructType; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -68,6 +71,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase { optional(2, "data", Types.StringType.get()) ); + private static final Schema SCHEMA2 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "category", Types.StringType.get()) + ); + + private static final Schema SCHEMA3 = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(3, "category", Types.StringType.get()) + ); + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build(); @Rule @@ -1111,7 +1125,7 @@ public void testPartitionsTable() { // check time travel List actualAfterFirstCommit = spark.read() .format("iceberg") - .option("snapshot-id", String.valueOf(firstCommitId)) + .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId)) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList(); @@ -1139,6 +1153,223 @@ public void testPartitionsTable() { } } + @Test + public synchronized void testSnapshotReadAfterAddColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshot1 = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x", "A"), + RowFactory.create(2, "y", "A"), + RowFactory.create(3, "z", "B")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis()); + table.updateSchema().deleteColumn("data").commit(); + long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis()); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3); + Dataset inputDf2 = spark.createDataFrame(newRecords, newSparkSchema); + inputDf2.select("id", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "A"), + RowFactory.create(2, "A"), + RowFactory.create(3, "B"), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf3.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema()); + + // At tsAfterDropColumn, there has been a schema change, but no new snapshot, + // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn. + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + + @Test + public synchronized void testSnapshotReadAfterAddAndDropColumn() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + List originalRecords = Lists.newArrayList( + RowFactory.create(1, "x"), + RowFactory.create(2, "y"), + RowFactory.create(3, "z")); + + StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA); + Dataset inputDf = spark.createDataFrame(originalRecords, originalSparkSchema); + inputDf.select("id", "data").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + Dataset resultDf = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf.orderBy("id").collectAsList()); + + Snapshot snapshot1 = table.currentSnapshot(); + + table.updateSchema().addColumn("category", Types.StringType.get()).commit(); + + List newRecords = Lists.newArrayList( + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2); + Dataset inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn); + inputDf2.select("id", "data", "category").write() + .format("iceberg") + .mode(SaveMode.Append) + .save(loadLocation(tableIdentifier)); + + table.refresh(); + + List updatedRecords = Lists.newArrayList( + RowFactory.create(1, "x", null), + RowFactory.create(2, "y", null), + RowFactory.create(3, "z", null), + RowFactory.create(4, "xy", "B"), + RowFactory.create(5, "xyz", "C")); + + Dataset resultDf2 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", updatedRecords, + resultDf2.orderBy("id").collectAsList()); + + table.updateSchema().deleteColumn("data").commit(); + + List recordsAfterDropColumn = Lists.newArrayList( + RowFactory.create(1, null), + RowFactory.create(2, null), + RowFactory.create(3, null), + RowFactory.create(4, "B"), + RowFactory.create(5, "C")); + + Dataset resultDf3 = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", recordsAfterDropColumn, + resultDf3.orderBy("id").collectAsList()); + + Dataset resultDf4 = spark.read() + .format("iceberg") + .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId()) + .load(loadLocation(tableIdentifier)); + Assert.assertEquals("Records should match", originalRecords, + resultDf4.orderBy("id").collectAsList()); + Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema()); + } + @Test public void testRemoveOrphanFilesActionSupport() throws InterruptedException { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index a51f9ee85e2f..76923d43a3bc 100644 --- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -44,11 +44,14 @@ public static void start() { @After public void dropTable() throws IOException { - Table table = catalog.loadTable(currentIdentifier); - Path tablePath = new Path(table.location()); - FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); - fs.delete(tablePath, true); - catalog.dropTable(currentIdentifier, false); + if (currentIdentifier != null) { + Table table = catalog.loadTable(currentIdentifier); + Path tablePath = new Path(table.location()); + FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf()); + fs.delete(tablePath, true); + catalog.dropTable(currentIdentifier, false); + currentIdentifier = null; + } } @Override diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 10b9d6f3030c..fc6d1c5f6b64 100644 --- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkTestBase; @@ -1111,7 +1112,7 @@ public void testPartitionsTable() { // check time travel List actualAfterFirstCommit = spark.read() .format("iceberg") - .option("snapshot-id", String.valueOf(firstCommitId)) + .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId)) .load(loadLocation(tableIdentifier, "partitions")) .orderBy("partition.id") .collectAsList();