Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,29 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

/**
* The row-id of the first newly added row in this snapshot. All rows added in this snapshot will
* have a row-id assigned to them greater than this value. All rows with a row-id less than this
* value were created in a snapshot that was added to the table (but not necessarily commited to
* this branch) in the past.
*
* @return the first row-id to be used in this snapshot or null if row lineage was not enabled
* when the table was created.
*/
default Long firstRowId() {
return null;
}

/**
* The total number of newly added rows in this snapshot. It should be the summation of {@link
* ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
*
* <p>This field is optional but is required when row lineage is enabled.
*
* @return the total number of new rows in this snapshot or null if the value was not stored.
*/
default Long addedRows() {
return null;
}
}
20 changes: 19 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class BaseSnapshot implements Snapshot {
private final Map<String, String> summary;
private final Integer schemaId;
private final String[] v1ManifestLocations;
private final Long firstRowId;
private final Long addedRows;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -61,7 +63,9 @@ class BaseSnapshot implements Snapshot {
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList) {
String manifestList,
Long firstRowId,
Long addedRows) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -71,6 +75,8 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.v1ManifestLocations = null;
this.firstRowId = firstRowId;
this.addedRows = addedRows;
}

BaseSnapshot(
Expand All @@ -91,6 +97,8 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = null;
this.v1ManifestLocations = v1ManifestLocations;
this.firstRowId = null;
this.addedRows = null;
}

@Override
Expand Down Expand Up @@ -128,6 +136,16 @@ public Integer schemaId() {
return schemaId;
}

@Override
public Long firstRowId() {
return firstRowId;
}

@Override
public Long addedRows() {
return addedRows;
}

private void cacheManifests(FileIO fileIO) {
if (fileIO == null) {
throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,4 +517,11 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) {
viewMetadataBuilder.setCurrentVersionId(versionId);
}
}

class EnableRowLineage implements MetadataUpdate {
@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.enableRowLineage();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private MetadataUpdateParser() {}
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String ENABLE_ROW_LINEAGE = "enable-row-lineage";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -154,6 +155,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -249,6 +251,8 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
case ENABLE_ROW_LINEAGE:
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -322,6 +326,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemovePartitionSpecs(jsonNode);
case ENABLE_ROW_LINEAGE:
return new MetadataUpdate.EnableRowLineage();
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public static TableMetadata replacePaths(
// TODO: update statistic file paths
metadata.statisticsFiles(),
metadata.partitionStatisticsFiles(),
metadata.changes());
metadata.changes(),
metadata.rowLineageEnabled(),
metadata.nextRowId());
}

private static Map<String, String> updateProperties(
Expand Down Expand Up @@ -186,7 +188,9 @@ private static List<Snapshot> updatePathInSnapshots(
snapshot.operation(),
snapshot.summary(),
snapshot.schemaId(),
newManifestListLocation);
newManifestListLocation,
snapshot.firstRowId(),
snapshot.addedRows());
newSnapshots.add(newSnapshot);
}
return newSnapshots;
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private SnapshotParser() {}
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";
private static final String FIRST_ROW_ID = "first-row-id";
private static final String ADDED_ROWS = "added-rows";

static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
generator.writeStartObject();
Expand Down Expand Up @@ -96,6 +98,14 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
}

if (snapshot.firstRowId() != null) {
generator.writeNumberField(FIRST_ROW_ID, snapshot.firstRowId());
}

if (snapshot.addedRows() != null) {
generator.writeNumberField(ADDED_ROWS, snapshot.addedRows());
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -158,6 +168,9 @@ static Snapshot fromJson(JsonNode node) {

Integer schemaId = JsonUtil.getIntOrNull(SCHEMA_ID, node);

Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, node);
Long addedRows = JsonUtil.getLongOrNull(ADDED_ROWS, node);

if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
Expand All @@ -169,7 +182,9 @@ static Snapshot fromJson(JsonNode node) {
operation,
summary,
schemaId,
manifestList);
manifestList,
firstRowId,
addedRows);

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -282,6 +283,13 @@ public Snapshot apply() {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}

Long addedRows = null;
Long lastRowId = null;
if (base.rowLineageEnabled()) {
addedRows = calculateAddedRows(manifests);
lastRowId = base.nextRowId();
}

return new BaseSnapshot(
sequenceNumber,
snapshotId(),
Expand All @@ -290,7 +298,27 @@ public Snapshot apply() {
operation(),
summary(base),
base.currentSchemaId(),
manifestList.location());
manifestList.location(),
lastRowId,
addedRows);
}

private Long calculateAddedRows(List<ManifestFile> manifests) {
return manifests.stream()
.filter(
manifest ->
manifest.snapshotId() == null
|| Objects.equals(manifest.snapshotId(), this.snapshotId))
.mapToLong(
manifest -> {
Preconditions.checkArgument(
manifest.addedRowsCount() != null,
"Cannot determine number of added rows in snapshot because"
+ " the entry for manifest %s is missing the field `added-rows-count`",
manifest.path());
return manifest.addedRowsCount();
})
.sum();
}

protected abstract Map<String, String> summary();
Expand Down
Loading