diff --git a/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java new file mode 100644 index 000000000000..7b00a68d5154 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.io.CloseableIterable; + +/** + * A {@link Table} implementation that exposes its valid delete files as rows. + *

+ * A valid delete file is one that is readable from any snapshot currently tracked by the table. + *

+ * This table may return duplicate rows. + */ +public class AllDeleteFilesTable extends BaseFilesTable { + + AllDeleteFilesTable(TableOperations ops, Table table) { + this(ops, table, table.name() + ".all_delete_files"); + } + + AllDeleteFilesTable(TableOperations ops, Table table, String name) { + super(ops, table, name); + } + + @Override + public TableScan newScan() { + return new AllDeleteFilesTableScan(operations(), table(), schema()); + } + + @Override + MetadataTableType metadataTableType() { + return MetadataTableType.ALL_DELETE_FILES; + } + + public static class AllDeleteFilesTableScan extends BaseAllFilesTableScan { + + AllDeleteFilesTableScan(TableOperations ops, Table table, Schema schema) { + super(ops, table, schema, MetadataTableType.ALL_DELETE_FILES); + } + + private AllDeleteFilesTableScan(TableOperations ops, Table table, Schema schema, + TableScanContext context) { + super(ops, table, schema, MetadataTableType.ALL_DELETE_FILES, context); + } + + @Override + protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) { + return new AllDeleteFilesTableScan(ops, table, schema, context); + } + + @Override + protected CloseableIterable manifests() { + return reachableManifests(Snapshot::deleteManifests); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/AllFilesTable.java b/core/src/main/java/org/apache/iceberg/AllFilesTable.java new file mode 100644 index 000000000000..e4eccc8f52dd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/AllFilesTable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import org.apache.iceberg.io.CloseableIterable; + +/** + * A {@link Table} implementation that exposes its valid files as rows. + *

+ * A valid file is one that is readable from any snapshot currently tracked by the table. + *

+ * This table may return duplicate rows. + */ +public class AllFilesTable extends BaseFilesTable { + + AllFilesTable(TableOperations ops, Table table) { + this(ops, table, table.name() + ".all_files"); + } + + AllFilesTable(TableOperations ops, Table table, String name) { + super(ops, table, name); + } + + @Override + public TableScan newScan() { + return new AllFilesTableScan(operations(), table(), schema()); + } + + @Override + MetadataTableType metadataTableType() { + return MetadataTableType.ALL_FILES; + } + + public static class AllFilesTableScan extends BaseAllFilesTableScan { + + AllFilesTableScan(TableOperations ops, Table table, Schema schema) { + super(ops, table, schema, MetadataTableType.ALL_FILES); + } + + private AllFilesTableScan(TableOperations ops, Table table, Schema schema, + TableScanContext context) { + super(ops, table, schema, MetadataTableType.ALL_FILES, context); + } + + @Override + protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) { + return new AllFilesTableScan(ops, table, schema, context); + } + + @Override + protected CloseableIterable manifests() { + return reachableManifests(Snapshot::allManifests); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java index d6df05736fcb..c1c4bf9c75ec 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java @@ -31,6 +31,8 @@ public enum MetadataTableType { MANIFESTS, PARTITIONS, ALL_DATA_FILES, + ALL_DELETE_FILES, + ALL_FILES, ALL_MANIFESTS, ALL_ENTRIES; diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index 785fb4d239ef..a289c29b1580 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -69,6 +69,10 @@ private static Table createMetadataTableInstance(TableOperations ops, Table base return new PartitionsTable(ops, baseTable, metadataTableName); case ALL_DATA_FILES: return new AllDataFilesTable(ops, baseTable, metadataTableName); + case ALL_DELETE_FILES: + return new AllDeleteFilesTable(ops, baseTable, metadataTableName); + case ALL_FILES: + return new AllFilesTable(ops, baseTable, metadataTableName); case ALL_MANIFESTS: return new AllManifestsTable(ops, baseTable, metadataTableName); case ALL_ENTRIES: diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java index 9edc379d4d71..0f1ead9a59bb 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java @@ -19,12 +19,14 @@ package org.apache.iceberg; +import java.util.Set; import java.util.stream.StreamSupport; import org.apache.iceberg.BaseFilesTable.ManifestReadTask; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -40,6 +42,10 @@ @RunWith(Parameterized.class) public class TestMetadataTableFilters extends TableTestBase { + private static final Set aggFileTables = Sets.newHashSet(MetadataTableType.ALL_DATA_FILES, + MetadataTableType.ALL_DATA_FILES, + MetadataTableType.ALL_FILES); + private final MetadataTableType type; @Parameterized.Parameters(name = "table_type = {0}, format = {1}") @@ -51,7 +57,10 @@ public static Object[][] parameters() { { MetadataTableType.FILES, 1 }, { MetadataTableType.FILES, 2 }, { MetadataTableType.ALL_DATA_FILES, 1 }, - { MetadataTableType.ALL_DATA_FILES, 2 } + { MetadataTableType.ALL_DATA_FILES, 2 }, + { MetadataTableType.ALL_DELETE_FILES, 2 }, + { MetadataTableType.ALL_FILES, 1 }, + { MetadataTableType.ALL_FILES, 2 } }; } @@ -95,7 +104,7 @@ public void setupTable() throws Exception { .commit(); } - if (type.equals(MetadataTableType.ALL_DATA_FILES)) { + if (isAggFileTable(type)) { // Clear all files from current snapshot to test whether 'all' Files tables scans previous files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Moves file entries to DELETED state table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Removes all entries @@ -114,6 +123,10 @@ private Table createMetadataTable() { return new DeleteFilesTable(table.ops(), table); case ALL_DATA_FILES: return new AllDataFilesTable(table.ops(), table); + case ALL_DELETE_FILES: + return new AllDeleteFilesTable(table.ops(), table); + case ALL_FILES: + return new AllFilesTable(table.ops(), table); default: throw new IllegalArgumentException("Unsupported metadata table type:" + type); } @@ -129,14 +142,25 @@ private int expectedScanTaskCount(int partitions) { } case DATA_FILES: case DELETE_FILES: + case ALL_DELETE_FILES: return partitions; case ALL_DATA_FILES: return partitions * 2; // ScanTask for Data Manifest in DELETED and ADDED states + case ALL_FILES: + if (formatVersion == 1) { + return partitions * 2; // ScanTask for Data Manifest in DELETED and ADDED states + } else { + return partitions * 4; // ScanTask for Delete and Data File in DELETED and ADDED states + } default: throw new IllegalArgumentException("Unsupported metadata table type:" + type); } } + private boolean isAggFileTable(MetadataTableType tableType) { + return aggFileTables.contains(tableType); + } + @Test public void testNoFilter() { Table metadataTable = createMetadataTable(); @@ -288,7 +312,7 @@ public void testPartitionSpecEvolutionRemovalV1() { table.newFastAppend().appendFile(data10).commit(); table.newFastAppend().appendFile(data11).commit(); - if (type.equals(MetadataTableType.ALL_DATA_FILES)) { + if (isAggFileTable(type)) { // Clear all files from current snapshot to test whether 'all' Files tables scans previous files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Moves file entries to DELETED state table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Removes all entries @@ -363,7 +387,7 @@ public void testPartitionSpecEvolutionRemovalV2() { table.newRowDelta().addDeletes(delete11).commit(); } - if (type.equals(MetadataTableType.ALL_DATA_FILES)) { + if (isAggFileTable(type)) { // Clear all files from current snapshot to test whether 'all' Files tables scans previous files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Moves file entries to DELETED state table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Removes all entries @@ -424,7 +448,7 @@ public void testPartitionSpecEvolutionAdditiveV1() { table.newFastAppend().appendFile(data10).commit(); table.newFastAppend().appendFile(data11).commit(); - if (type.equals(MetadataTableType.ALL_DATA_FILES)) { + if (isAggFileTable(type)) { // Clear all files from current snapshot to test whether 'all' Files tables scans previous files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Moves file entries to DELETED state table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Removes all entries @@ -499,7 +523,7 @@ public void testPartitionSpecEvolutionAdditiveV2() { table.newRowDelta().addDeletes(delete11).commit(); } - if (type.equals(MetadataTableType.ALL_DATA_FILES)) { + if (isAggFileTable(type)) { // Clear all files from current snapshot to test whether 'all' Files tables scans previous files table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Moves file entries to DELETED state table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); // Removes all entries diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java index 968ed79dfed6..ea4cee2a61aa 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java @@ -20,11 +20,13 @@ package org.apache.iceberg.spark.extensions; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.avro.generic.GenericData.Record; +import org.apache.commons.collections.ListUtils; import org.apache.iceberg.FileContent; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Schema; @@ -169,7 +171,7 @@ public void testPartitionedTable() throws Exception { Assert.assertEquals("Metadata table should return one data file", 1, actualDataFiles.size()); TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedDataFiles.get(0), actualDataFiles.get(0)); - // Check all files table + // Check files table List expectedFiles = Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream()) .collect(Collectors.toList()); Assert.assertEquals("Should have two file manifest entries", 2, expectedFiles.size()); @@ -182,7 +184,7 @@ public void testPartitionedTable() throws Exception { } @Test - public void testAllFiles() throws Exception { + public void testAllFilesUnpartitioned() throws Exception { sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" + "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName); @@ -197,10 +199,14 @@ public void testAllFiles() throws Exception { .writeTo(tableName) .append(); + // Create delete file + sql("DELETE FROM %s WHERE id=1", tableName); Table table = Spark3Util.loadIcebergTable(spark, tableName); List expectedDataManifests = TestHelpers.dataManifests(table); Assert.assertEquals("Should have 1 data manifest", 1, expectedDataManifests.size()); + List expectedDeleteManifests = TestHelpers.deleteManifests(table); + Assert.assertEquals("Should have 1 delete manifest", 1, expectedDeleteManifests.size()); // Clear table to test whether 'all_files' can read past files List results = sql("DELETE FROM %s", tableName); @@ -209,19 +215,35 @@ public void testAllFiles() throws Exception { Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); + // Check all data files table List actualDataFiles = spark.sql("SELECT * FROM " + tableName + ".all_data_files").collectAsList(); List expectedDataFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, null); - Assert.assertEquals("Should be one data file manifest entry", 1, expectedDataFiles.size()); Assert.assertEquals("Metadata table should return one data file", 1, actualDataFiles.size()); - TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedDataFiles.get(0), actualDataFiles.get(0)); + + // Check all delete files table + List actualDeleteFiles = spark.sql("SELECT * FROM " + tableName + ".all_delete_files").collectAsList(); + List expectedDeleteFiles = expectedEntries(table, FileContent.POSITION_DELETES, + entriesTableSchema, expectedDeleteManifests, null); + Assert.assertEquals("Should be one delete file manifest entry", 1, expectedDeleteFiles.size()); + Assert.assertEquals("Metadata table should return one delete file", 1, actualDeleteFiles.size()); + TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedDeleteFiles.get(0), actualDeleteFiles.get(0)); + + // Check all files table + List actualFiles = spark.sql("SELECT * FROM " + tableName + ".all_files ORDER BY content") + .collectAsList(); + List expectedFiles = ListUtils.union(expectedDataFiles, expectedDeleteFiles); + expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); + Assert.assertEquals("Metadata table should return two files", 2, actualFiles.size()); + TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, actualFiles); } @Test public void testAllFilesPartitioned() throws Exception { + // Create table and insert data sql("CREATE TABLE %s (id bigint, data string) " + "USING iceberg " + "PARTITIONED BY (data) " + @@ -246,27 +268,48 @@ public void testAllFilesPartitioned() throws Exception { .writeTo(tableName) .append(); + // Create delete file + sql("DELETE FROM %s WHERE id=1", tableName); + Table table = Spark3Util.loadIcebergTable(spark, tableName); List expectedDataManifests = TestHelpers.dataManifests(table); Assert.assertEquals("Should have 2 data manifests", 2, expectedDataManifests.size()); + List expectedDeleteManifests = TestHelpers.deleteManifests(table); + Assert.assertEquals("Should have 1 delete manifest", 1, expectedDeleteManifests.size()); // Clear table to test whether 'all_files' can read past files List results = sql("DELETE FROM %s", tableName); Assert.assertEquals("Table should be cleared", 0, results.size()); - List actualDataFiles = spark.sql("SELECT * FROM " + tableName + ".all_data_files " + - "WHERE partition.data='a'").collectAsList(); - Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema(); Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".all_data_files").schema(); + // Check all data files table + List actualDataFiles = spark.sql("SELECT * FROM " + tableName + ".all_data_files " + + "WHERE partition.data='a'").collectAsList(); List expectedDataFiles = expectedEntries(table, FileContent.DATA, entriesTableSchema, expectedDataManifests, "a"); - Assert.assertEquals("Should be one data file manifest entry", 1, expectedDataFiles.size()); Assert.assertEquals("Metadata table should return one data file", 1, actualDataFiles.size()); - TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedDataFiles.get(0), actualDataFiles.get(0)); + + // Check all delete files table + List actualDeleteFiles = spark.sql("SELECT * FROM " + tableName + ".all_delete_files " + + "WHERE partition.data='a'").collectAsList(); + List expectedDeleteFiles = expectedEntries(table, FileContent.POSITION_DELETES, + entriesTableSchema, expectedDeleteManifests, "a"); + Assert.assertEquals("Should be one data file manifest entry", 1, expectedDeleteFiles.size()); + Assert.assertEquals("Metadata table should return one data file", 1, actualDeleteFiles.size()); + + TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedDeleteFiles.get(0), actualDeleteFiles.get(0)); + + // Check all files table + List actualFiles = spark.sql("SELECT * FROM " + tableName + ".all_files WHERE partition.data='a' " + + "ORDER BY content").collectAsList(); + List expectedFiles = ListUtils.union(expectedDataFiles, expectedDeleteFiles); + expectedFiles.sort(Comparator.comparing(r -> ((Integer) r.get("content")))); + Assert.assertEquals("Metadata table should return two files", 2, actualFiles.size()); + TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, actualFiles); } /** diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index edc64cb2a8b4..a726d58e28f2 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -45,6 +45,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -79,6 +80,10 @@ public class TestHelpers { private TestHelpers() { } + public static void assertEqualsSafe(Types.StructType struct, List recs, List rows) { + Streams.forEachPair(recs.stream(), rows.stream(), (rec, row) -> assertEqualsSafe(struct, rec, row)); + } + public static void assertEqualsSafe(Types.StructType struct, Record rec, Row row) { List fields = struct.fields(); for (int i = 0; i < fields.size(); i += 1) {