Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,11 @@ public interface Snapshot extends Serializable {
* @return the location of the manifest list for this Snapshot
*/
String manifestListLocation();

/**
* Return the schema id of the snapshot
*
* @return the schema id
*/
int schemaId();
}
17 changes: 13 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class BaseSnapshot implements Snapshot {
private final String manifestListLocation;
private final String operation;
private final Map<String, String> summary;
private final int schemaId;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -59,7 +60,7 @@ class BaseSnapshot implements Snapshot {
String... manifestFiles) {
this(io, snapshotId, null, System.currentTimeMillis(), null, null,
Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(io.newInputFile(path), 0)));
path -> new GenericManifestFile(io.newInputFile(path), 0)), 0);
}

BaseSnapshot(FileIO io,
Expand All @@ -69,7 +70,8 @@ class BaseSnapshot implements Snapshot {
long timestampMillis,
String operation,
Map<String, String> summary,
String manifestList) {
String manifestList,
int schemaId) {
this.io = io;
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
Expand All @@ -78,6 +80,7 @@ class BaseSnapshot implements Snapshot {
this.operation = operation;
this.summary = summary;
this.manifestListLocation = manifestList;
this.schemaId = schemaId;
}

BaseSnapshot(FileIO io,
Expand All @@ -86,8 +89,9 @@ class BaseSnapshot implements Snapshot {
long timestampMillis,
String operation,
Map<String, String> summary,
List<ManifestFile> dataManifests) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null);
List<ManifestFile> dataManifests,
int schemaId) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null, schemaId);
this.allManifests = dataManifests;
}

Expand Down Expand Up @@ -180,6 +184,11 @@ public String manifestListLocation() {
return manifestListLocation;
}

@Override
public int schemaId() {
return schemaId;
}

private void cacheChanges() {
ImmutableList.Builder<DataFile> adds = ImmutableList.builder();
ImmutableList.Builder<DataFile> deletes = ImmutableList.builder();
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ public TableScan useSnapshot(long scanSnapshotId) {
"Cannot override snapshot, already set to id=%s", context.snapshotId());
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);

int schemaId = ops.current().snapshot(scanSnapshotId).schemaId();

return newRefinedScan(
ops, table, schema, context.useSnapshotId(scanSnapshotId));
ops, table, ops.current().schemasById().get(schemaId), context.useSnapshotId(scanSnapshotId));
}

@Override
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private SnapshotParser() {
private static final String OPERATION = "operation";
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";

static void toJson(Snapshot snapshot, JsonGenerator generator)
throws IOException {
Expand All @@ -58,6 +59,7 @@ static void toJson(Snapshot snapshot, JsonGenerator generator)
generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
}
generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());

// if there is an operation, write the summary map
if (snapshot.operation() != null) {
Expand Down Expand Up @@ -119,6 +121,7 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
}
long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);

int schemaId = JsonUtil.getInt(SCHEMA_ID, node);
Map<String, String> summary = null;
String operation = null;
if (node.has(SUMMARY)) {
Expand All @@ -142,14 +145,15 @@ static Snapshot fromJson(FileIO io, JsonNode node) {
if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, manifestList);
return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, manifestList,
schemaId);

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
// loaded lazily, if it is needed
List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(io.newInputFile(location), 0));
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests);
return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests, schemaId);
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ public Snapshot apply() {

return new BaseSnapshot(ops.io(),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifestList.location());
manifestList.location(), base.currentSchemaId());

} else {
return new BaseSnapshot(ops.io(),
snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifests);
manifests, base.currentSchemaId());
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testJsonConversionWithOperation() {

Snapshot expected = new BaseSnapshot(ops.io(), id, parentId, System.currentTimeMillis(),
DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"),
manifests);
manifests, 0);

String json = SnapshotParser.toJson(expected);
Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
Expand Down Expand Up @@ -102,9 +102,9 @@ public void testJsonConversionWithManifestList() throws IOException {
}

Snapshot expected = new BaseSnapshot(
ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList).location());
ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList).location(), 0);
Snapshot inMemory = new BaseSnapshot(
ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests);
ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests, 0);

Assert.assertEquals("Files should match in memory list",
inMemory.allManifests(), expected.allManifests());
Expand Down
26 changes: 13 additions & 13 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ public void testJsonConversion() throws Exception {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0);
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0);

List<HistoryEntry> snapshotLog = ImmutableList.<HistoryEntry>builder()
.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
Expand Down Expand Up @@ -169,11 +169,11 @@ public void testBackwardCompat() throws Exception {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())), 0);
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())), 0);

TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID,
Expand Down Expand Up @@ -228,7 +228,7 @@ public void testBackwardCompat() throws Exception {
previousSnapshot.allManifests(),
metadata.snapshot(previousSnapshotId).allManifests());
Assert.assertEquals("Snapshot logs should match",
expected.previousFiles(), metadata.previousFiles());
expected.previousFiles(), metadata.previousFiles());
}

private static String toJsonWithoutSpecAndSchemaList(TableMetadata metadata) {
Expand Down Expand Up @@ -281,11 +281,11 @@ public void testJsonWithPreviousMetadataLog() throws Exception {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0);
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0);

List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
Expand All @@ -312,11 +312,11 @@ public void testAddPreviousMetadataRemoveNone() {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0);
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0);

List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -352,11 +352,11 @@ public void testAddPreviousMetadataRemoveOne() {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0);
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0);

List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -404,11 +404,11 @@ public void testAddPreviousMetadataRemoveMultiple() {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())), 0);
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())), 0);

List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
return FlinkFixupTypes.fixup(schema, baseSchema);
}

public static Schema fixup(Schema schema, Schema baseSchema) {
return FlinkFixupTypes.fixup(schema, baseSchema);
}

/**
* Convert a {@link Schema} to a {@link RowType Flink type}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -42,6 +44,9 @@
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

public class FlinkSource {
private FlinkSource() {
Expand Down Expand Up @@ -74,6 +79,7 @@ public static class Builder {
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private Long scanSnapshotId;
private ReadableConfig readableConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();

Expand Down Expand Up @@ -108,6 +114,10 @@ public Builder limit(long newLimit) {
}

public Builder properties(Map<String, String> properties) {
if (properties.containsKey("snapshot-id")) {
scanSnapshotId = Long.parseLong(properties.get("snapshot-id"));
}

contextBuilder.fromProperties(properties);
return this;
}
Expand All @@ -118,6 +128,7 @@ public Builder caseSensitive(boolean caseSensitive) {
}

public Builder snapshotId(Long snapshotId) {
scanSnapshotId = snapshotId;
contextBuilder.useSnapshotId(snapshotId);
return this;
}
Expand Down Expand Up @@ -178,27 +189,49 @@ public FlinkInputFormat buildFormat() {
tableLoader.open();
try (TableLoader loader = tableLoader) {
table = loader.loadTable();
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
icebergSchema = table.schema();
io = table.io();
encryption = table.encryption();
}

if (table instanceof HasTableOperations && scanSnapshotId != null) {
TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
int schemaId = tableMetadata.snapshot(scanSnapshotId).schemaId();
icebergSchema = tableMetadata.schemasById().get(schemaId);
} else {
icebergSchema = table.schema();
}

if (projectedSchema == null) {
contextBuilder.project(icebergSchema);
} else {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
contextBuilder.project(getProjectedSchema(icebergSchema, projectedSchema));
}

return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}

private Schema getProjectedSchema(Schema icebergSchema, TableSchema projectSchema) {
List<Types.NestedField> projectFields = FlinkSchemaUtil.convert(projectSchema).columns();

List<Types.NestedField> newFields = Lists.newArrayList();
for (Types.NestedField nestedField : projectFields) {
if (icebergSchema.findType(nestedField.name()) != null) {
newFields.add(nestedField);
}
}

// reassign ids to match the base schema
Schema schema = TypeUtil.reassignIds(new Schema(newFields), icebergSchema);
// fix types that can't be represented in Flink (UUID)
return FlinkSchemaUtil.fixup(schema, icebergSchema);
}

public DataStream<RowData> build() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
Expand Down
Loading