diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index a72fb2c1eca3..3d75052924bd 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -62,11 +62,12 @@ public interface DataFile extends ContentFile { Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()), "Equality comparison field IDs"); Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID"); + Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID"); int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 141 + // NEXT ID TO ASSIGN: 142 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -74,6 +75,7 @@ static StructType getType(StructType partitionType) { CONTENT, FILE_PATH, FILE_FORMAT, + SPEC_ID, required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC), RECORD_COUNT, FILE_SIZE, diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index 28898439970e..84af7e7838af 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -235,45 +235,48 @@ public void put(int i, Object value) { this.format = FileFormat.valueOf(value.toString()); return; case 3: - this.partitionData = (PartitionData) value; + this.partitionSpecId = (value != null) ? (Integer) value : -1; return; case 4: - this.recordCount = (Long) value; + this.partitionData = (PartitionData) value; return; case 5: - this.fileSizeInBytes = (Long) value; + this.recordCount = (Long) value; return; case 6: - this.columnSizes = (Map) value; + this.fileSizeInBytes = (Long) value; return; case 7: - this.valueCounts = (Map) value; + this.columnSizes = (Map) value; return; case 8: - this.nullValueCounts = (Map) value; + this.valueCounts = (Map) value; return; case 9: - this.nanValueCounts = (Map) value; + this.nullValueCounts = (Map) value; return; case 10: - this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); + this.nanValueCounts = (Map) value; return; case 11: - this.upperBounds = SerializableByteBufferMap.wrap((Map) value); + this.lowerBounds = SerializableByteBufferMap.wrap((Map) value); return; case 12: - this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + this.upperBounds = SerializableByteBufferMap.wrap((Map) value); return; case 13: - this.splitOffsets = ArrayUtil.toLongArray((List) value); + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); return; case 14: - this.equalityIds = ArrayUtil.toIntArray((List) value); + this.splitOffsets = ArrayUtil.toLongArray((List) value); return; case 15: - this.sortOrderId = (Integer) value; + this.equalityIds = ArrayUtil.toIntArray((List) value); return; case 16: + this.sortOrderId = (Integer) value; + return; + case 17: this.fileOrdinal = (long) value; return; default: @@ -301,32 +304,34 @@ public Object get(int i) { case 2: return format != null ? format.toString() : null; case 3: - return partitionData; + return partitionSpecId; case 4: - return recordCount; + return partitionData; case 5: - return fileSizeInBytes; + return recordCount; case 6: - return columnSizes; + return fileSizeInBytes; case 7: - return valueCounts; + return columnSizes; case 8: - return nullValueCounts; + return valueCounts; case 9: - return nanValueCounts; + return nullValueCounts; case 10: - return lowerBounds; + return nanValueCounts; case 11: - return upperBounds; + return lowerBounds; case 12: - return keyMetadata(); + return upperBounds; case 13: - return splitOffsets(); + return keyMetadata(); case 14: - return equalityFieldIds(); + return splitOffsets(); case 15: - return sortOrderId; + return equalityFieldIds(); case 16: + return sortOrderId; + case 17: return pos; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -442,6 +447,7 @@ public String toString() { .add("content", content.toString().toLowerCase(Locale.ROOT)) .add("file_path", filePath) .add("file_format", format) + .add("spec_id", specId()) .add("partition", partitionData) .add("record_count", recordCount) .add("file_size_in_bytes", fileSizeInBytes) diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 93a3bf13189f..eb6f5841fd62 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -153,7 +153,7 @@ public void testEntriesTable() throws Exception { rows.forEach(row -> { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(row); }); } @@ -311,7 +311,7 @@ public void testAllEntriesTable() throws Exception { rows.forEach(row -> { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(row); }); } @@ -386,7 +386,7 @@ public void testFilesTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -442,7 +442,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -549,7 +549,7 @@ public void testFilesUnpartitionedTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -646,7 +646,7 @@ public void testAllDataFilesTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -1411,4 +1411,9 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { Assert.assertEquals("Rows must match", records, actualRecords); } + + private void asMetadataRecord(GenericData.Record file) { + file.put(0, FileContent.DATA.id()); + file.put(3, 0); // specId + } } diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java index 111e908b90ea..6e55d1f78976 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -342,7 +342,7 @@ private Set extractFilePathsMatchingConditionOnPartition(List files // idx 1: file_path, idx 3: partition return files.stream() .filter(r -> { - Row partition = r.getStruct(3); + Row partition = r.getStruct(4); return condition.test(partition); }).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1))) .collect(Collectors.toSet()); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index fc6d1c5f6b64..532052d45511 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -21,6 +21,7 @@ import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; @@ -140,7 +141,7 @@ public void testEntriesTable() throws Exception { rows.forEach(row -> { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(row); }); } @@ -298,7 +299,7 @@ public void testAllEntriesTable() throws Exception { rows.forEach(row -> { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(row); }); } @@ -373,7 +374,7 @@ public void testFilesTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -429,7 +430,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -536,7 +537,7 @@ public void testFilesUnpartitionedTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -633,7 +634,7 @@ public void testAllDataFilesTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -1181,4 +1182,44 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { Assert.assertEquals("Rows must match", records, actualRecords); } + + @Test + public void testFilesTablePartitionId() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + int spec0 = table.spec().specId(); + + Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + Dataset df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class); + + df1.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + // change partition spec + table.refresh(); + table.updateSpec().removeField("id").commit(); + int spec1 = table.spec().specId(); + + // add a second file + df2.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "files")) + .sort(DataFile.SPEC_ID.name()) + .collectAsList() + .stream().map(r -> (Integer) r.getAs(DataFile.SPEC_ID.name())).collect(Collectors.toList()); + + Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual); + } + + private void asMetadataRecord(GenericData.Record file) { + file.put(0, FileContent.DATA.id()); + file.put(3, 0); // specId + } } diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java index 111e908b90ea..6e55d1f78976 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -342,7 +342,7 @@ private Set extractFilePathsMatchingConditionOnPartition(List files // idx 1: file_path, idx 3: partition return files.stream() .filter(r -> { - Row partition = r.getStruct(3); + Row partition = r.getStruct(4); return condition.test(partition); }).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1))) .collect(Collectors.toSet()); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 10b9d6f3030c..d88dd018c1f0 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -21,6 +21,7 @@ import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.AssertHelpers; @@ -139,7 +140,7 @@ public void testEntriesTable() throws Exception { rows.forEach(row -> { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(row); }); } @@ -297,7 +298,7 @@ public void testAllEntriesTable() throws Exception { rows.forEach(row -> { row.put(2, 0L); GenericData.Record file = (GenericData.Record) row.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(row); }); } @@ -372,7 +373,7 @@ public void testFilesTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -428,7 +429,7 @@ public void testFilesTableWithSnapshotIdInheritance() throws Exception { try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { for (GenericData.Record record : rows) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -535,7 +536,7 @@ public void testFilesUnpartitionedTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -632,7 +633,7 @@ public void testAllDataFilesTable() throws Exception { for (GenericData.Record record : rows) { if ((Integer) record.get("status") < 2 /* added or existing */) { GenericData.Record file = (GenericData.Record) record.get("data_file"); - file.put(0, FileContent.DATA.id()); + asMetadataRecord(file); expected.add(file); } } @@ -1180,4 +1181,45 @@ public void testRemoveOrphanFilesActionSupport() throws InterruptedException { Assert.assertEquals("Rows must match", records, actualRecords); } + + + @Test + public void testFilesTablePartitionId() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test"); + Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build()); + int spec0 = table.spec().specId(); + + Dataset df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class); + Dataset df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class); + + df1.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + // change partition spec + table.refresh(); + table.updateSpec().removeField("id").commit(); + int spec1 = table.spec().specId(); + + // add a second file + df2.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + List actual = spark.read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "files")) + .sort(DataFile.SPEC_ID.name()) + .collectAsList() + .stream().map(r -> (Integer) r.getAs(DataFile.SPEC_ID.name())).collect(Collectors.toList()); + + Assert.assertEquals("Should have two partition specs", ImmutableList.of(spec0, spec1), actual); + } + + private void asMetadataRecord(GenericData.Record file) { + file.put(0, FileContent.DATA.id()); + file.put(3, 0); // specId + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java index 111e908b90ea..6e55d1f78976 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -342,7 +342,7 @@ private Set extractFilePathsMatchingConditionOnPartition(List files // idx 1: file_path, idx 3: partition return files.stream() .filter(r -> { - Row partition = r.getStruct(3); + Row partition = r.getStruct(4); return condition.test(partition); }).map(r -> CountOpenLocalFileSystem.stripScheme(r.getString(1))) .collect(Collectors.toSet());