Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ default Integer schemaId() {
* value were created in a snapshot that was added to the table (but not necessarily commited to
* this branch) in the past.
*
* @return the first row-id to be used in this snapshot or null if row lineage was not enabled
* when the table was created.
* @return the first row-id to be used in this snapshot or null when row lineage is not supported
*/
default Long firstRowId() {
return null;
Expand All @@ -189,7 +188,7 @@ default Long firstRowId() {
* The total number of newly added rows in this snapshot. It should be the summation of {@link
* ManifestFile#ADDED_ROWS_COUNT} for every manifest added in this snapshot.
*
* <p>This field is optional but is required when row lineage is enabled.
* <p>This field is optional but is required when the table version supports row lineage.
*
* @return the total number of new rows in this snapshot or null if the value was not stored.
*/
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ class BaseSnapshot implements Snapshot {
String manifestList,
Long firstRowId,
Long addedRows) {
Preconditions.checkArgument(
firstRowId == null || firstRowId >= 0,
"Invalid first-row-id (cannot be negative): %s",
firstRowId);
Preconditions.checkArgument(
addedRows == null || addedRows >= 0,
"Invalid added-rows (cannot be negative): %s",
addedRows);
Preconditions.checkArgument(
firstRowId == null || addedRows != null,
"Invalid added-rows (required when first-row-id is set): null");
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -76,7 +87,7 @@ class BaseSnapshot implements Snapshot {
this.manifestListLocation = manifestList;
this.v1ManifestLocations = null;
this.firstRowId = firstRowId;
this.addedRows = addedRows;
this.addedRows = firstRowId != null ? addedRows : null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be slightly more permissive, I'm allowing addedRows to be passed, but it is only propagated if firstRowId is non-null.

}

BaseSnapshot(
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,12 @@ public void applyTo(ViewMetadata.Builder viewMetadataBuilder) {
}
}

/**
* Update to enable row lineage.
*
* @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables.
*/
@Deprecated
class EnableRowLineage implements MetadataUpdate {
@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private MetadataUpdateParser() {}
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
static final String REMOVE_SCHEMAS = "remove-schemas";
static final String ENABLE_ROW_LINEAGE = "enable-row-lineage";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -160,7 +159,6 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.put(MetadataUpdate.RemoveSchemas.class, REMOVE_SCHEMAS)
.put(MetadataUpdate.EnableRowLineage.class, ENABLE_ROW_LINEAGE)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -259,8 +257,6 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
case REMOVE_SCHEMAS:
writeRemoveSchemas((MetadataUpdate.RemoveSchemas) metadataUpdate, generator);
break;
case ENABLE_ROW_LINEAGE:
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -336,8 +332,6 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readRemovePartitionSpecs(jsonNode);
case REMOVE_SCHEMAS:
return readRemoveSchemas(jsonNode);
case ENABLE_ROW_LINEAGE:
return new MetadataUpdate.EnableRowLineage();
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ public static TableMetadata replacePaths(
updatePathInStatisticsFiles(metadata.statisticsFiles(), sourcePrefix, targetPrefix),
// TODO: update partition statistics file paths
metadata.partitionStatisticsFiles(),
metadata.changes(),
metadata.rowLineageEnabled(),
metadata.nextRowId());
metadata.nextRowId(),
metadata.changes());
}

private static Map<String, String> updateProperties(
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ public Snapshot apply() {
}

Long addedRows = null;
Long lastRowId = null;
if (base.rowLineageEnabled()) {
Long firstRowId = null;
if (base.formatVersion() >= 3) {
addedRows = calculateAddedRows(manifests);
lastRowId = base.nextRowId();
firstRowId = base.nextRowId();
}

return new BaseSnapshot(
Expand All @@ -299,7 +299,7 @@ public Snapshot apply() {
summary(base),
base.currentSchemaId(),
manifestList.location(),
lastRowId,
firstRowId,
addedRows);
}

Expand All @@ -309,6 +309,7 @@ private Long calculateAddedRows(List<ManifestFile> manifests) {
manifest ->
manifest.snapshotId() == null
|| Objects.equals(manifest.snapshotId(), this.snapshotId))
.filter(manifest -> manifest.content() == ManifestContent.DATA)
.mapToLong(
manifest -> {
Preconditions.checkArgument(
Expand Down
101 changes: 34 additions & 67 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ public class TableMetadata implements Serializable {
static final long INVALID_SEQUENCE_NUMBER = -1;
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 3;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
static final int INITIAL_ROW_ID = 0;
static final boolean DEFAULT_ROW_LINEAGE = false;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;

private static final long ONE_MINUTE = TimeUnit.MINUTES.toMillis(1);

Expand Down Expand Up @@ -133,11 +132,6 @@ static TableMetadata newTableMetadata(
int freshSortOrderId = sortOrder.isUnsorted() ? sortOrder.orderId() : INITIAL_SORT_ORDER_ID;
SortOrder freshSortOrder = freshSortOrder(freshSortOrderId, freshSchema, sortOrder);

// configure row lineage using table properties
Boolean rowLineage =
PropertyUtil.propertyAsBoolean(
properties, TableProperties.ROW_LINEAGE, DEFAULT_ROW_LINEAGE);

// Validate the metrics configuration. Note: we only do this on new tables to we don't
// break existing tables.
MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);
Expand All @@ -151,7 +145,6 @@ static TableMetadata newTableMetadata(
.setDefaultSortOrder(freshSortOrder)
.setLocation(location)
.setProperties(properties)
.setRowLineage(rowLineage)
.build();
}

Expand Down Expand Up @@ -266,13 +259,12 @@ public String toString() {
private final List<StatisticsFile> statisticsFiles;
private final List<PartitionStatisticsFile> partitionStatisticsFiles;
private final List<MetadataUpdate> changes;
private final long nextRowId;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private volatile List<Snapshot> snapshots;
private volatile Map<Long, Snapshot> snapshotsById;
private volatile Map<String, SnapshotRef> refs;
private volatile boolean snapshotsLoaded;
private final Boolean rowLineageEnabled;
private final long nextRowId;

@SuppressWarnings("checkstyle:CyclomaticComplexity")
TableMetadata(
Expand All @@ -299,9 +291,8 @@ public String toString() {
Map<String, SnapshotRef> refs,
List<StatisticsFile> statisticsFiles,
List<PartitionStatisticsFile> partitionStatisticsFiles,
List<MetadataUpdate> changes,
boolean rowLineageEnabled,
long nextRowId) {
long nextRowId,
List<MetadataUpdate> changes) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that this PR also restores the convention that changes are passed last to the TableMetadata constructor. This is because changes are accumulated in TableMetadata.Builder and are not part of the metadata definition. I'd prefer not to mix changes into the actual table metadata.

Preconditions.checkArgument(
specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
Preconditions.checkArgument(
Expand All @@ -320,10 +311,6 @@ public String toString() {
Preconditions.checkArgument(
metadataFileLocation == null || changes.isEmpty(),
"Cannot create TableMetadata with a metadata location and changes");
Preconditions.checkArgument(
formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE || !rowLineageEnabled,
"Cannot enable row lineage when Table Version is less than V3. Table Version is %s",
formatVersion);

this.metadataFileLocation = metadataFileLocation;
this.formatVersion = formatVersion;
Expand Down Expand Up @@ -359,7 +346,6 @@ public String toString() {
this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles);

// row lineage
this.rowLineageEnabled = rowLineageEnabled;
this.nextRowId = nextRowId;

HistoryEntry last = null;
Expand Down Expand Up @@ -584,8 +570,14 @@ public TableMetadata withUUID() {
return new Builder(this).assignUUID().build();
}

/**
* Whether row lineage is enabled.
*
* @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables.
*/
@Deprecated
public boolean rowLineageEnabled() {
return rowLineageEnabled;
return formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE;
}

public long nextRowId() {
Expand Down Expand Up @@ -634,15 +626,10 @@ public TableMetadata replaceProperties(Map<String, String> rawProperties) {
int newFormatVersion =
PropertyUtil.propertyAsInt(rawProperties, TableProperties.FORMAT_VERSION, formatVersion);

Boolean newRowLineage =
PropertyUtil.propertyAsBoolean(
rawProperties, TableProperties.ROW_LINEAGE, rowLineageEnabled);

return new Builder(this)
.setProperties(updated)
.removeProperties(removed)
.upgradeFormatVersion(newFormatVersion)
.setRowLineage(newRowLineage)
.build();
}

Expand Down Expand Up @@ -927,7 +914,6 @@ public static class Builder {
private final Map<Long, List<StatisticsFile>> statisticsFiles;
private final Map<Long, List<PartitionStatisticsFile>> partitionStatisticsFiles;
private boolean suppressHistoricalSnapshots = false;
private boolean rowLineage;
private long nextRowId;

// change tracking
Expand Down Expand Up @@ -975,7 +961,6 @@ private Builder(int formatVersion) {
this.schemasById = Maps.newHashMap();
this.specsById = Maps.newHashMap();
this.sortOrdersById = Maps.newHashMap();
this.rowLineage = DEFAULT_ROW_LINEAGE;
this.nextRowId = INITIAL_ROW_ID;
}

Expand Down Expand Up @@ -1011,10 +996,25 @@ private Builder(TableMetadata base) {
this.specsById = Maps.newHashMap(base.specsById);
this.sortOrdersById = Maps.newHashMap(base.sortOrdersById);

this.rowLineage = base.rowLineageEnabled;
this.nextRowId = base.nextRowId;
}

/**
* Enables row lineage in v3 tables.
*
* @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables.
*/
@Deprecated
public Builder enableRowLineage() {
if (formatVersion < MIN_FORMAT_VERSION_ROW_LINEAGE) {
throw new UnsupportedOperationException(
"Cannot enable row lineage for format-version=" + formatVersion);
}

// otherwise this is a no-op
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected that this method would return this instead of null as otherwise library users using this method would now get a NPE

}

public Builder withMetadataLocation(String newMetadataLocation) {
this.metadataLocation = newMetadataLocation;
if (null != base) {
Expand Down Expand Up @@ -1269,18 +1269,14 @@ public Builder addSnapshot(Snapshot snapshot) {
snapshotsById.put(snapshot.snapshotId(), snapshot);
changes.add(new MetadataUpdate.AddSnapshot(snapshot));

if (rowLineage) {
if (formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE) {
ValidationException.check(
snapshot.firstRowId() != null, "Cannot add a snapshot: first-row-id is null");
ValidationException.check(
snapshot.firstRowId() >= nextRowId,
"Cannot add a snapshot whose 'first-row-id' (%s) is less than the metadata 'next-row-id' (%s) because this will end up generating duplicate row_ids.",
snapshot.firstRowId() != null && snapshot.firstRowId() >= nextRowId,
"Cannot add a snapshot, first-row-id is behind table next-row-id: %s < %s",
snapshot.firstRowId(),
nextRowId);
ValidationException.check(
snapshot.addedRows() != null,
"Cannot add a snapshot with a null 'added-rows' field when row lineage is enabled");
Preconditions.checkArgument(
snapshot.addedRows() >= 0,
"Cannot decrease 'last-row-id'. 'last-row-id' must increase monotonically. Snapshot reports %s added rows");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer, I moved these validations in to Snapshot rather than leaving them here. Now this only validates that first-row-id is set for any Snapshot in a v3 table and that the first-row-id is increasing. Both of those can't be checked by the Snapshot itself.

Also, I thought about keeping the checks here rather than moving them to Snapshot so that we can always read metadata, even when written incorrectly. However, when a snapshot is missing first-row-id, it isn't safe to read the snapshot so I think it is okay to fail when reading metadata rather than when using the metadata.


this.nextRowId += snapshot.addedRows();
}
Expand Down Expand Up @@ -1508,34 +1504,6 @@ public Builder setPreviousFileLocation(String previousFileLocation) {
return this;
}

private Builder setRowLineage(Boolean newRowLineage) {
if (newRowLineage == null) {
return this;
}

boolean disablingRowLineage = rowLineage && !newRowLineage;

Preconditions.checkArgument(
!disablingRowLineage, "Cannot disable row lineage once it has been enabled");

if (!rowLineage && newRowLineage) {
return enableRowLineage();
} else {
return this;
}
}

public Builder enableRowLineage() {
Preconditions.checkArgument(
formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE,
"Cannot use row lineage with format version %s. Only format version %s or higher support row lineage",
formatVersion,
MIN_FORMAT_VERSION_ROW_LINEAGE);
this.rowLineage = true;
changes.add(new MetadataUpdate.EnableRowLineage());
return this;
}

private boolean hasChanges() {
return changes.size() != startingChangeCount
|| (discardChanges && !changes.isEmpty())
Expand Down Expand Up @@ -1603,9 +1571,8 @@ public TableMetadata build() {
partitionStatisticsFiles.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList()),
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes),
rowLineage,
nextRowId);
nextRowId,
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes));
}

private int addSchemaInternal(Schema schema, int newLastColumnId) {
Expand Down
Loading
Loading