diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index 3fd8714f9840..9362d0527800 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -126,4 +126,11 @@ public interface Snapshot extends Serializable { * @return the location of the manifest list for this Snapshot */ String manifestListLocation(); + + /** + * Return the schema id of the snapshot + * + * @return the schema id + */ + int schemaId(); } diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 95e6d621f238..8be50b3ace80 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -43,6 +43,7 @@ class BaseSnapshot implements Snapshot { private final String manifestListLocation; private final String operation; private final Map summary; + private final int schemaId; // lazily initialized private transient List allManifests = null; @@ -59,7 +60,7 @@ class BaseSnapshot implements Snapshot { String... manifestFiles) { this(io, snapshotId, null, System.currentTimeMillis(), null, null, Lists.transform(Arrays.asList(manifestFiles), - path -> new GenericManifestFile(io.newInputFile(path), 0))); + path -> new GenericManifestFile(io.newInputFile(path), 0)), 0); } BaseSnapshot(FileIO io, @@ -69,7 +70,8 @@ class BaseSnapshot implements Snapshot { long timestampMillis, String operation, Map summary, - String manifestList) { + String manifestList, + int schemaId) { this.io = io; this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; @@ -78,6 +80,7 @@ class BaseSnapshot implements Snapshot { this.operation = operation; this.summary = summary; this.manifestListLocation = manifestList; + this.schemaId = schemaId; } BaseSnapshot(FileIO io, @@ -86,8 +89,9 @@ class BaseSnapshot implements Snapshot { long timestampMillis, String operation, Map summary, - List dataManifests) { - this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null); + List dataManifests, + int schemaId) { + this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null, schemaId); this.allManifests = dataManifests; } @@ -180,6 +184,11 @@ public String manifestListLocation() { return manifestListLocation; } + @Override + public int schemaId() { + return schemaId; + } + private void cacheChanges() { ImmutableList.Builder adds = ImmutableList.builder(); ImmutableList.Builder deletes = ImmutableList.builder(); diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 356d909f6bba..506508c07079 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -123,8 +123,11 @@ public TableScan useSnapshot(long scanSnapshotId) { "Cannot override snapshot, already set to id=%s", context.snapshotId()); Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null, "Cannot find snapshot with ID %s", scanSnapshotId); + + int schemaId = ops.current().snapshot(scanSnapshotId).schemaId(); + return newRefinedScan( - ops, table, schema, context.useSnapshotId(scanSnapshotId)); + ops, table, ops.current().schemasById().get(schemaId), context.useSnapshotId(scanSnapshotId)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index cf13c35b02ff..558630aae054 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -46,6 +46,7 @@ private SnapshotParser() { private static final String OPERATION = "operation"; private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; + private static final String SCHEMA_ID = "schema-id"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { @@ -58,6 +59,7 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId()); } generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis()); + generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); // if there is an operation, write the summary map if (snapshot.operation() != null) { @@ -119,6 +121,7 @@ static Snapshot fromJson(FileIO io, JsonNode node) { } long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node); + int schemaId = JsonUtil.getInt(SCHEMA_ID, node); Map summary = null; String operation = null; if (node.has(SUMMARY)) { @@ -142,14 +145,15 @@ static Snapshot fromJson(FileIO io, JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); - return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, manifestList); + return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, manifestList, + schemaId); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be // loaded lazily, if it is needed List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node), location -> new GenericManifestFile(io.newInputFile(location), 0)); - return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests); + return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests, schemaId); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 3a179f99fbe4..142d30547d8c 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -188,12 +188,12 @@ public Snapshot apply() { return new BaseSnapshot(ops.io(), sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - manifestList.location()); + manifestList.location(), base.currentSchemaId()); } else { return new BaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), - manifests); + manifests, base.currentSchemaId()); } } diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 664cdc5756fd..bf587514f99b 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -62,7 +62,7 @@ public void testJsonConversionWithOperation() { Snapshot expected = new BaseSnapshot(ops.io(), id, parentId, System.currentTimeMillis(), DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"), - manifests); + manifests, 0); String json = SnapshotParser.toJson(expected); Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json); @@ -102,9 +102,9 @@ public void testJsonConversionWithManifestList() throws IOException { } Snapshot expected = new BaseSnapshot( - ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList).location()); + ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList).location(), 0); Snapshot inMemory = new BaseSnapshot( - ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests); + ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests, 0); Assert.assertEquals("Files should match in memory list", inMemory.allManifests(), expected.allManifests()); diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index a7b30a6cdd7a..46d190b5fa15 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -90,11 +90,11 @@ public void testJsonConversion() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0); List snapshotLog = ImmutableList.builder() .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId())) @@ -169,11 +169,11 @@ public void testBackwardCompat() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())), 0); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())), 0); TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION, 0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID, @@ -228,7 +228,7 @@ public void testBackwardCompat() throws Exception { previousSnapshot.allManifests(), metadata.snapshot(previousSnapshotId).allManifests()); Assert.assertEquals("Snapshot logs should match", - expected.previousFiles(), metadata.previousFiles()); + expected.previousFiles(), metadata.previousFiles()); } private static String toJsonWithoutSpecAndSchemaList(TableMetadata metadata) { @@ -281,11 +281,11 @@ public void testJsonWithPreviousMetadataLog() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -312,11 +312,11 @@ public void testAddPreviousMetadataRemoveNone() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -352,11 +352,11 @@ public void testAddPreviousMetadataRemoveOne() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -404,11 +404,11 @@ public void testAddPreviousMetadataRemoveMultiple() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java index fa871e505129..42cc7e49798e 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java @@ -86,6 +86,10 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) { return FlinkFixupTypes.fixup(schema, baseSchema); } + public static Schema fixup(Schema schema, Schema baseSchema) { + return FlinkFixupTypes.fixup(schema, baseSchema); + } + /** * Convert a {@link Schema} to a {@link RowType Flink type}. * diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 84507c411bcc..507cdb054bfb 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -31,8 +31,10 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.data.RowData; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableScan; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; @@ -42,6 +44,9 @@ import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; public class FlinkSource { private FlinkSource() { @@ -74,6 +79,7 @@ public static class Builder { private Table table; private TableLoader tableLoader; private TableSchema projectedSchema; + private Long scanSnapshotId; private ReadableConfig readableConfig = new Configuration(); private final ScanContext.Builder contextBuilder = ScanContext.builder(); @@ -108,6 +114,10 @@ public Builder limit(long newLimit) { } public Builder properties(Map properties) { + if (properties.containsKey("snapshot-id")) { + scanSnapshotId = Long.parseLong(properties.get("snapshot-id")); + } + contextBuilder.fromProperties(properties); return this; } @@ -118,6 +128,7 @@ public Builder caseSensitive(boolean caseSensitive) { } public Builder snapshotId(Long snapshotId) { + scanSnapshotId = snapshotId; contextBuilder.useSnapshotId(snapshotId); return this; } @@ -178,27 +189,49 @@ public FlinkInputFormat buildFormat() { tableLoader.open(); try (TableLoader loader = tableLoader) { table = loader.loadTable(); - icebergSchema = table.schema(); io = table.io(); encryption = table.encryption(); } catch (IOException e) { throw new UncheckedIOException(e); } } else { - icebergSchema = table.schema(); io = table.io(); encryption = table.encryption(); } + if (table instanceof HasTableOperations && scanSnapshotId != null) { + TableMetadata tableMetadata = ((HasTableOperations) table).operations().current(); + int schemaId = tableMetadata.snapshot(scanSnapshotId).schemaId(); + icebergSchema = tableMetadata.schemasById().get(schemaId); + } else { + icebergSchema = table.schema(); + } + if (projectedSchema == null) { contextBuilder.project(icebergSchema); } else { - contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); + contextBuilder.project(getProjectedSchema(icebergSchema, projectedSchema)); } return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build()); } + private Schema getProjectedSchema(Schema icebergSchema, TableSchema projectSchema) { + List projectFields = FlinkSchemaUtil.convert(projectSchema).columns(); + + List newFields = Lists.newArrayList(); + for (Types.NestedField nestedField : projectFields) { + if (icebergSchema.findType(nestedField.name()) != null) { + newFields.add(nestedField); + } + } + + // reassign ids to match the base schema + Schema schema = TypeUtil.reassignIds(new Schema(newFields), icebergSchema); + // fix types that can't be represented in Flink (UUID) + return FlinkSchemaUtil.fixup(schema, icebergSchema); + } + public DataStream build() { Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); FlinkInputFormat format = buildFormat(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index eae3233a6546..d20bfe94d9ca 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -30,10 +30,12 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.Test; @@ -126,6 +128,103 @@ public void testBasicProjection() throws IOException { TestHelpers.assertRows(result, expected); } + @Test + public void testSnapshotReadsWithAddColumn() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema); + List writeRecords = RandomGenericData.generate(writeSchema, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long timestampMillis = table.currentSnapshot().timestampMillis(); + waitUntilAfter(timestampMillis); + + table.updateSchema().addColumn("d", Types.IntegerType.get()).commit(); + + Schema writeSchema1 = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()), + Types.NestedField.optional(2, "d", Types.IntegerType.get()) + ); + + List writeRecords1 = RandomGenericData.generate(writeSchema1, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords1); + + table.refresh(); + long snapshotId1 = table.currentSnapshot().snapshotId(); + long timestampMillis1 = table.currentSnapshot().timestampMillis(); + waitUntilAfter(timestampMillis1); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("snapshot-id", Long.toString(snapshotId))), + writeRecords, writeSchema); + + for (Record record : writeRecords) { + GenericRecord genericRecord = GenericRecord.create(writeSchema1); + genericRecord.set(0, record.get(0)); + genericRecord.set(1, record.get(1)); + genericRecord.set(2, null); + writeRecords1.add(genericRecord); + } + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("snapshot-id", Long.toString(snapshotId1))), + writeRecords1, writeSchema1); + } + + @Test + public void testSnapshotReadsWithDeleteColumn() throws Exception { + Schema writeSchema = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()), + Types.NestedField.optional(2, "d", Types.IntegerType.get()) + ); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), writeSchema); + List writeRecords = RandomGenericData.generate(writeSchema, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords); + + table.refresh(); + long snapshotId = table.currentSnapshot().snapshotId(); + long timestampMillis = table.currentSnapshot().timestampMillis(); + waitUntilAfter(timestampMillis); + + table.updateSchema().deleteColumn("d").commit(); + + Schema writeSchema1 = new Schema( + Types.NestedField.required(0, "id", Types.LongType.get()), + Types.NestedField.optional(1, "data", Types.StringType.get()) + ); + + List writeRecords1 = RandomGenericData.generate(writeSchema1, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords1); + + table.refresh(); + long snapshotId1 = table.currentSnapshot().snapshotId(); + long timestampMillis1 = table.currentSnapshot().timestampMillis(); + waitUntilAfter(timestampMillis1); + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("snapshot-id", Long.toString(snapshotId))), + writeRecords, writeSchema); + + for (Record record : writeRecords) { + GenericRecord genericRecord = GenericRecord.create(writeSchema1); + genericRecord.set(0, record.get(0)); + genericRecord.set(1, record.get(1)); + writeRecords1.add(genericRecord); + } + + TestHelpers.assertRecords( + runWithOptions(ImmutableMap.of("snapshot-id", Long.toString(snapshotId1))), + writeRecords1, writeSchema1); + } + private List runFormat(FlinkInputFormat inputFormat) throws IOException { RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema()); return TestHelpers.readRows(inputFormat, rowType); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index b9c7d2c20ba4..f976e977e648 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -311,7 +311,7 @@ private static void assertRows(List results, Row... expected) { TestHelpers.assertRows(results, Arrays.asList(expected)); } - private static void waitUntilAfter(long timestampMillis) { + protected static void waitUntilAfter(long timestampMillis) { long current = System.currentTimeMillis(); while (current <= timestampMillis) { current = System.currentTimeMillis();