From a5a92a9aa0e8fc163086062457f7b93777cd729b Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Wed, 6 Apr 2022 17:39:26 +0200 Subject: [PATCH 1/3] initial change --- .../java/org/apache/iceberg/PartitionsTable.java | 12 ++++++++---- .../org/apache/iceberg/TestMetadataTableScans.java | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index e0b89df6d714..85f5df4f2ce1 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -45,9 +45,10 @@ public class PartitionsTable extends BaseMetadataTable { super(ops, table, name); this.schema = new Schema( - Types.NestedField.required(1, "partition", table.spec().partitionType()), - Types.NestedField.required(2, "record_count", Types.LongType.get()), - Types.NestedField.required(3, "file_count", Types.IntegerType.get()) + Types.NestedField.required(1, "spec_id", Types.IntegerType.get()), + Types.NestedField.required(2, "partition", Partitioning.partitionType(table)), + Types.NestedField.required(3, "record_count", Types.LongType.get()), + Types.NestedField.required(4, "file_count", Types.IntegerType.get()) ); } @@ -89,7 +90,7 @@ private DataTask task(StaticTableScan scan) { } private static StaticDataTask.Row convertPartition(Partition partition) { - return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount); + return StaticDataTask.Row.of(partition.specId, partition.key, partition.recordCount, partition.fileCount); } private static Iterable partitions(StaticTableScan scan) { @@ -164,17 +165,20 @@ Iterable all() { } static class Partition { + private int specId; private final StructLike key; private long recordCount; private int fileCount; Partition(StructLike key) { + this.specId = 0; this.key = key; this.recordCount = 0; this.fileCount = 0; } void update(DataFile file) { + this.specId = file.specId(); this.recordCount += file.recordCount(); this.fileCount += 1; } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 4846ad43579d..ace9d6f232e9 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -364,7 +364,7 @@ public void testPartitionsTableScanNoFilter() { Table partitionsTable = new PartitionsTable(table.ops(), table); Types.StructType expected = new Schema( - required(1, "partition", Types.StructType.of( + required(2, "partition", Types.StructType.of( optional(1000, "data_bucket", Types.IntegerType.get())))).asStruct(); TableScan scanNoFilter = partitionsTable.newScan().select("partition.data_bucket"); From fc40183a426206fb227403f480d89118336f3752 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Thu, 7 Apr 2022 17:45:29 +0200 Subject: [PATCH 2/3] Spark test fixes --- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 ++++-- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 ++++-- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 ++++-- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 6 ++++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 791e63e4506c..975d7ed10d40 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1049,8 +1049,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(2, "record_count", Types.LongType.get()), - required(3, "file_count", Types.IntegerType.get())); + required(3, "record_count", Types.LongType.get()), + required(4, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1107,11 +1107,13 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) .build()); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 78137d139dbf..07b46c80d4af 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1050,8 +1050,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(2, "record_count", Types.LongType.get()), - required(3, "file_count", Types.IntegerType.get())); + required(3, "record_count", Types.LongType.get()), + required(4, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1108,11 +1108,13 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) .build()); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 78137d139dbf..07b46c80d4af 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1050,8 +1050,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(2, "record_count", Types.LongType.get()), - required(3, "file_count", Types.IntegerType.get())); + required(3, "record_count", Types.LongType.get()), + required(4, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1108,11 +1108,13 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) .build()); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index af6a75a34736..e5faea159612 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1051,8 +1051,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(2, "record_count", Types.LongType.get()), - required(3, "file_count", Types.IntegerType.get())); + required(3, "record_count", Types.LongType.get()), + required(4, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1109,11 +1109,13 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) .build()); expected.add(builder + .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) From db6e8a0666b355b4940a50105161d0b785bc9f69 Mon Sep 17 00:00:00 2001 From: Adam Szita Date: Mon, 11 Apr 2022 14:20:16 +0200 Subject: [PATCH 3/3] reordering spec_id --- .../java/org/apache/iceberg/PartitionsTable.java | 16 ++++++++-------- .../apache/iceberg/TestMetadataTableScans.java | 2 +- .../source/TestIcebergSourceTablesBase.java | 8 ++++---- .../source/TestIcebergSourceTablesBase.java | 8 ++++---- .../source/TestIcebergSourceTablesBase.java | 8 ++++---- .../source/TestIcebergSourceTablesBase.java | 8 ++++---- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 85f5df4f2ce1..ca81b19f1777 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -45,10 +45,10 @@ public class PartitionsTable extends BaseMetadataTable { super(ops, table, name); this.schema = new Schema( - Types.NestedField.required(1, "spec_id", Types.IntegerType.get()), - Types.NestedField.required(2, "partition", Partitioning.partitionType(table)), - Types.NestedField.required(3, "record_count", Types.LongType.get()), - Types.NestedField.required(4, "file_count", Types.IntegerType.get()) + Types.NestedField.required(1, "partition", Partitioning.partitionType(table)), + Types.NestedField.required(2, "record_count", Types.LongType.get()), + Types.NestedField.required(3, "file_count", Types.IntegerType.get()), + Types.NestedField.required(4, "spec_id", Types.IntegerType.get()) ); } @@ -90,7 +90,7 @@ private DataTask task(StaticTableScan scan) { } private static StaticDataTask.Row convertPartition(Partition partition) { - return StaticDataTask.Row.of(partition.specId, partition.key, partition.recordCount, partition.fileCount); + return StaticDataTask.Row.of(partition.key, partition.recordCount, partition.fileCount, partition.specId); } private static Iterable partitions(StaticTableScan scan) { @@ -165,22 +165,22 @@ Iterable all() { } static class Partition { - private int specId; private final StructLike key; private long recordCount; private int fileCount; + private int specId; Partition(StructLike key) { - this.specId = 0; this.key = key; this.recordCount = 0; this.fileCount = 0; + this.specId = 0; } void update(DataFile file) { - this.specId = file.specId(); this.recordCount += file.recordCount(); this.fileCount += 1; + this.specId = file.specId(); } } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index ace9d6f232e9..4846ad43579d 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -364,7 +364,7 @@ public void testPartitionsTableScanNoFilter() { Table partitionsTable = new PartitionsTable(table.ops(), table); Types.StructType expected = new Schema( - required(2, "partition", Types.StructType.of( + required(1, "partition", Types.StructType.of( optional(1000, "data_bucket", Types.IntegerType.get())))).asStruct(); TableScan scanNoFilter = partitionsTable.newScan().select("partition.data_bucket"); diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 975d7ed10d40..30bfb89a1048 100644 --- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1049,8 +1049,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(3, "record_count", Types.LongType.get()), - required(4, "file_count", Types.IntegerType.get())); + required(2, "record_count", Types.LongType.get()), + required(3, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1107,16 +1107,16 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); Assert.assertEquals("Partitions table should have two rows", 2, expected.size()); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 07b46c80d4af..89764018ffc5 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1050,8 +1050,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(3, "record_count", Types.LongType.get()), - required(4, "file_count", Types.IntegerType.get())); + required(2, "record_count", Types.LongType.get()), + required(3, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1108,16 +1108,16 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); Assert.assertEquals("Partitions table should have two rows", 2, expected.size()); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 07b46c80d4af..89764018ffc5 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1050,8 +1050,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(3, "record_count", Types.LongType.get()), - required(4, "file_count", Types.IntegerType.get())); + required(2, "record_count", Types.LongType.get()), + required(3, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1108,16 +1108,16 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); Assert.assertEquals("Partitions table should have two rows", 2, expected.size()); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index e5faea159612..c47c4c0a2db2 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1051,8 +1051,8 @@ public void testUnpartitionedPartitionsTable() { .save(loadLocation(tableIdentifier)); Types.StructType expectedSchema = Types.StructType.of( - required(3, "record_count", Types.LongType.get()), - required(4, "file_count", Types.IntegerType.get())); + required(2, "record_count", Types.LongType.get()), + required(3, "file_count", Types.IntegerType.get())); Table partitionsTable = loadTable(tableIdentifier, "partitions"); @@ -1109,16 +1109,16 @@ public void testPartitionsTable() { partitionsTable.schema().findType("partition").asStructType(), "partition")); List expected = Lists.newArrayList(); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 1).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); expected.add(builder - .set("spec_id", 0) .set("partition", partitionBuilder.set("id", 2).build()) .set("record_count", 1L) .set("file_count", 1) + .set("spec_id", 0) .build()); Assert.assertEquals("Partitions table should have two rows", 2, expected.size());