Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5ced9a1
Core: Update manifest lists to support firstRowId.
rdblue Mar 25, 2025
59063a5
Core: Update manifests to support firstRowId.
rdblue Mar 25, 2025
af5e7bd
Implement and test DataFile inheritance.
rdblue Mar 27, 2025
9f178c2
Update TestManifestListVersions.
rdblue Mar 27, 2025
1220087
Update row lineage tests.
rdblue Mar 28, 2025
8295c04
Core: Fail if a data file does not have a first-row-id.
rdblue Mar 28, 2025
7fba446
Only pass non-null next-row-id for v3 tables.
rdblue Mar 28, 2025
a6f5b9e
Apply spotless.
rdblue Mar 28, 2025
463f9ab
Fix benchmarks.
rdblue Mar 28, 2025
cd4c413
Fix TestManifestReader.
rdblue Mar 28, 2025
5a1f72b
Fix checkstyle.
rdblue Mar 28, 2025
59a7295
Fix TestFilesTableTaskParser.
rdblue Mar 28, 2025
74be1a2
Fix complexity in ManifestFileParser.
rdblue Mar 31, 2025
dd2c853
Fix TestRewriteTablePathProcedure.
rdblue Apr 1, 2025
a33ce90
Fix review comments from Russell.
rdblue Apr 9, 2025
a4daebf
Leave ID space for existing rows.
rdblue Apr 10, 2025
ba47da4
Apply spotless.
rdblue Apr 11, 2025
08a0f27
Fix Spark metadata table tests.
rdblue Apr 11, 2025
5dbff22
Fix Spark actions that do not project record_count.
rdblue Apr 11, 2025
31784d9
Fix typo: assumeThat instead of assertThat.
rdblue Apr 11, 2025
c8a0d52
Fix TestManifestReaderStats.
rdblue Apr 14, 2025
61bbf59
Fix TestMetadataTablesWithPartitionEvolution.
rdblue Apr 14, 2025
fd4cf98
Fix inheritance.
rdblue Apr 16, 2025
c36ad53
Fix TestTables and checkstyle.
rdblue Apr 16, 2025
03bada9
Add test cases with TestTables.
rdblue Apr 16, 2025
ff8ac0f
Apply spotless.
rdblue Apr 16, 2025
b4addec
Fix checkstyle.
rdblue Apr 16, 2025
f990fb8
Apply spotless.
rdblue Apr 16, 2025
bdfc6ce
Changes from reviews.
rdblue Apr 17, 2025
b0d2230
Core: Update ContentFileParser to produce first-row-id.
rdblue Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ default Long fileSequenceNumber() {
return null;
}

/**
* Returns the starting row ID to assign to new rows in the data file (with _row_id set to null).
*/
default Long firstRowId() {
return null;
}

/**
* Copies this file. Manifest readers can reuse file instances; use this method to copy data when
* collecting files from tasks.
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField SORT_ORDER_ID =
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Types.NestedField FIRST_ROW_ID =
optional(142, "first_row_id", LongType.get(), "Starting row ID to assign to new rows");
Types.NestedField REFERENCED_DATA_FILE =
optional(
143,
Expand Down Expand Up @@ -140,6 +142,7 @@ static StructType getType(StructType partitionType) {
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID,
FIRST_ROW_ID,
REFERENCED_DATA_FILE,
CONTENT_OFFSET,
CONTENT_SIZE);
Expand Down
16 changes: 14 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ public interface ManifestFile {
"Summary for each partition");
Types.NestedField KEY_METADATA =
optional(519, "key_metadata", Types.BinaryType.get(), "Encryption key metadata blob");
// next ID to assign: 520
Types.NestedField FIRST_ROW_ID =
optional(
520,
"first_row_id",
Types.LongType.get(),
"Starting row ID to assign to new rows in ADDED data files");
// next ID to assign: 521

Schema SCHEMA =
new Schema(
Expand All @@ -107,7 +113,8 @@ public interface ManifestFile {
EXISTING_ROWS_COUNT,
DELETED_ROWS_COUNT,
PARTITION_SUMMARIES,
KEY_METADATA);
KEY_METADATA,
FIRST_ROW_ID);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -198,6 +205,11 @@ default ByteBuffer keyMetadata() {
return null;
}

/** Returns the starting row ID to assign to new rows in ADDED data files. */
default Long firstRowId() {
return null;
}

/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void before() {
Random random = new Random(System.currentTimeMillis());

try (ManifestListWriter listWriter =
ManifestLists.write(1, org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, 0)) {
ManifestLists.write(
1, org.apache.iceberg.Files.localOutput(manifestListFile), 0, 1L, 0, 0L)) {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
org.apache.iceberg.Files.localOutput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public void writeManifestFile(BenchmarkState state) throws IOException {
org.apache.iceberg.Files.localOutput(manifestListFile),
0,
1L,
0)) {
0,
0L)) {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
org.apache.iceberg.Files.localOutput(
Expand Down
32 changes: 26 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public PartitionData copy() {
private int[] equalityIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;
private Long firstRowId = null;
private String referencedDataFile = null;
private Long contentOffset = null;
private Long contentSizeInBytes = null;
Expand Down Expand Up @@ -111,6 +112,7 @@ public PartitionData copy() {
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID,
DataFile.FIRST_ROW_ID,
DataFile.REFERENCED_DATA_FILE,
DataFile.CONTENT_OFFSET,
DataFile.CONTENT_SIZE,
Expand Down Expand Up @@ -156,6 +158,7 @@ public PartitionData copy() {
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata,
Long firstRowId,
String referencedDataFile,
Long contentOffset,
Long contentSizeInBytes) {
Expand Down Expand Up @@ -187,6 +190,7 @@ public PartitionData copy() {
this.equalityIds = equalityFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.firstRowId = firstRowId;
this.referencedDataFile = referencedDataFile;
this.contentOffset = contentOffset;
this.contentSizeInBytes = contentSizeInBytes;
Expand Down Expand Up @@ -242,6 +246,7 @@ public PartitionData copy() {
this.sortOrderId = toCopy.sortOrderId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.firstRowId = toCopy.firstRowId;
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
this.contentSizeInBytes = toCopy.contentSizeInBytes;
Expand Down Expand Up @@ -283,6 +288,15 @@ public void setFileSequenceNumber(Long fileSequenceNumber) {
this.fileSequenceNumber = fileSequenceNumber;
}

@Override
public Long firstRowId() {
return firstRowId;
}

public void setFirstRowId(long firstRowId) {
this.firstRowId = firstRowId;
}

protected abstract Schema getAvroSchema(Types.StructType partitionStruct);

@Override
Expand Down Expand Up @@ -354,15 +368,18 @@ protected <T> void internalSet(int pos, T value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.referencedDataFile = value != null ? value.toString() : null;
this.firstRowId = (Long) value;
return;
case 18:
this.contentOffset = (Long) value;
this.referencedDataFile = value != null ? value.toString() : null;
return;
case 19:
this.contentSizeInBytes = (Long) value;
this.contentOffset = (Long) value;
return;
case 20:
this.contentSizeInBytes = (Long) value;
return;
case 21:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -412,12 +429,14 @@ private Object getByPos(int basePos) {
case 16:
return sortOrderId;
case 17:
return referencedDataFile;
return firstRowId;
case 18:
return contentOffset;
return referencedDataFile;
case 19:
return contentSizeInBytes;
return contentOffset;
case 20:
return contentSizeInBytes;
case 21:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
Expand Down Expand Up @@ -607,6 +626,7 @@ public String toString() {
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("first_row_id", firstRowId == null ? "null" : firstRowId)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
.add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {
return ManifestFiles.copyRewriteManifest(
current.formatVersion(),
manifest.partitionSpecId(),
manifest.firstRowId(),
toCopy,
specsById,
newFile,
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ContentFileParser {
private static final String SPLIT_OFFSETS = "split-offsets";
private static final String EQUALITY_IDS = "equality-ids";
private static final String SORT_ORDER_ID = "sort-order-id";
private static final String FIRST_ROW_ID = "first-row-id";
private static final String REFERENCED_DATA_FILE = "referenced-data-file";
private static final String CONTENT_OFFSET = "content-offset";
private static final String CONTENT_SIZE = "content-size-in-bytes";
Expand Down Expand Up @@ -112,6 +113,8 @@ static void toJson(ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
}

JsonUtil.writeLongFieldIfPresent(FIRST_ROW_ID, contentFile.firstRowId(), generator);

if (contentFile instanceof DeleteFile) {
DeleteFile deleteFile = (DeleteFile) contentFile;

Expand Down Expand Up @@ -164,6 +167,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode);
int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode);
Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);
Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, jsonNode);
String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE, jsonNode);
Long contentOffset = JsonUtil.getLongOrNull(CONTENT_OFFSET, jsonNode);
Long contentSizeInBytes = JsonUtil.getLongOrNull(CONTENT_SIZE, jsonNode);
Expand All @@ -178,7 +182,8 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
metrics,
keyMetadata,
splitOffsets,
sortOrderId);
sortOrderId,
firstRowId);
} else {
return new GenericDeleteFile(
specId,
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public static class Builder {
private ByteBuffer keyMetadata = null;
private List<Long> splitOffsets = null;
private Integer sortOrderId = SortOrder.unsorted().orderId();
private Long firstRowId = null;

public Builder(PartitionSpec spec) {
this.spec = spec;
Expand All @@ -178,6 +179,7 @@ public void clear() {
this.upperBounds = null;
this.splitOffsets = null;
this.sortOrderId = SortOrder.unsorted().orderId();
this.firstRowId = null;
}

public Builder copy(DataFile toCopy) {
Expand All @@ -201,6 +203,7 @@ public Builder copy(DataFile toCopy) {
this.splitOffsets =
toCopy.splitOffsets() == null ? null : ImmutableList.copyOf(toCopy.splitOffsets());
this.sortOrderId = toCopy.sortOrderId();
this.firstRowId = toCopy.firstRowId();
return this;
}

Expand Down Expand Up @@ -315,6 +318,11 @@ public Builder withSortOrder(SortOrder newSortOrder) {
return this;
}

public Builder withFirstRowId(Long nextRowId) {
this.firstRowId = nextRowId;
return this;
}

public DataFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
Expand All @@ -340,7 +348,8 @@ public DataFile build() {
upperBounds),
keyMetadata,
splitOffsets,
sortOrderId);
sortOrderId,
firstRowId);
}
}
}
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
manifest.partitionSpecId(),
manifest.firstRowId(),
toCopy,
current.specsById(),
newManifestFile,
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
Metrics metrics,
ByteBuffer keyMetadata,
List<Long> splitOffsets,
Integer sortOrderId) {
Integer sortOrderId,
Long firstRowId) {
super(
specId,
FileContent.DATA,
Expand All @@ -65,6 +66,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
null /* no equality field IDs */,
sortOrderId,
keyMetadata,
firstRowId,
null /* no referenced data file */,
null /* no content offset */,
null /* no content size */);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
equalityFieldIds,
sortOrderId,
keyMetadata,
null /* delete files do not use first-row-id */,
referencedDataFile,
contentOffset,
contentSizeInBytes);
Expand Down
Loading