diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 77bd08952ea9..2bace1e5d53d 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -86,6 +86,8 @@ public VectorizedReader message( reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(idToConstant.get(id))); } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(VectorizedArrowReader.positions()); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(false)); } else if (reader != null) { reorderedFields.add(reader); } else { diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index 0761084dc215..a8eb2eb37821 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -36,6 +36,8 @@ private MetadataColumns() { Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored"); public static final NestedField ROW_POSITION = NestedField.required( Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file"); + public static final NestedField IS_DELETED = NestedField.required( + Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted"); // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns public static final NestedField DELETE_FILE_PATH = NestedField.required( @@ -47,7 +49,8 @@ private MetadataColumns() { private static final Map META_COLUMNS = ImmutableMap.of( FILE_PATH.name(), FILE_PATH, - ROW_POSITION.name(), ROW_POSITION); + ROW_POSITION.name(), ROW_POSITION, + IS_DELETED.name(), IS_DELETED); private static final Set META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId) .collect(ImmutableSet.toImmutableSet()); diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 5e72c9b1572b..6019e80db547 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -582,8 +582,7 @@ public abstract static class StructReader implements ValueReader, Supports protected StructReader(List> readers, Schema schema) { this.readers = readers.toArray(new ValueReader[0]); - this.positions = new int[0]; - this.constants = new Object[0]; + Integer isDeletedColumnPos = null; List fields = schema.getFields(); for (int pos = 0; pos < fields.size(); pos += 1) { @@ -591,8 +590,18 @@ protected StructReader(List> readers, Schema schema) { if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.ROW_POSITION.fieldId()) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; + } else if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.IS_DELETED.fieldId()) { + isDeletedColumnPos = pos; } } + + if (isDeletedColumnPos == null) { + this.positions = new int[0]; + this.constants = new Object[0]; + } else { + this.positions = new int[]{isDeletedColumnPos}; + this.constants = new Object[]{false}; + } } protected StructReader(List> readers, Types.StructType struct, Map idToConstant) { @@ -609,6 +618,9 @@ protected StructReader(List> readers, Types.StructType struct, Ma } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + positionList.add(pos); + constantList.add(false); } } diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index 7a0752975e80..a8eb13cdfa68 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -243,6 +243,8 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema, requiredIds.addAll(eqDelete.equalityFieldIds()); } + requiredIds.add(MetadataColumns.IS_DELETED.fieldId()); + Set missingIds = Sets.newLinkedHashSet( Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema))); @@ -253,8 +255,8 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema, // TODO: support adding nested columns. this will currently fail when finding nested columns to add List columns = Lists.newArrayList(requestedSchema.columns()); for (int fieldId : missingIds) { - if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { - continue; // add _pos at the end + if (fieldId == MetadataColumns.ROW_POSITION.fieldId() || fieldId == MetadataColumns.IS_DELETED.fieldId()) { + continue; // add _pos and _deleted at the end } Types.NestedField field = tableSchema.asStruct().field(fieldId); @@ -267,6 +269,10 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema, columns.add(MetadataColumns.ROW_POSITION); } + if (missingIds.contains(MetadataColumns.IS_DELETED.fieldId())) { + columns.add(MetadataColumns.IS_DELETED); + } + return new Schema(columns); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 319568bb6e99..d7088b4700e5 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -118,6 +118,9 @@ public ParquetValueReader struct(Types.StructType expected, GroupType s } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); types.add(null); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(ParquetValueReaders.constant(false)); + types.add(null); } else { ParquetValueReader reader = readersById.get(id); if (reader != null) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 5e1372da925d..d8c91b5a64ab 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -53,7 +53,6 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; @@ -191,9 +190,19 @@ public static void assertTableRows(Table table, List expected) throws I public static void assertTableRecords(Table table, List expected) throws IOException { table.refresh(); + + Types.StructType type = table.schema().asStruct(); + StructLikeSet expectedSet = StructLikeSet.create(type); + expectedSet.addAll(expected); + try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { - Assert.assertEquals("Should produce the expected record", - HashMultiset.create(expected), HashMultiset.create(iterable)); + StructLikeSet actualSet = StructLikeSet.create(type); + + for (Record record : iterable) { + actualSet.add(record); + } + + Assert.assertEquals("Should produce the expected record", expectedSet, actualSet); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java index 514e00977580..1be6889dbee1 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java @@ -154,6 +154,9 @@ protected StructReader(List> readers, Types.StructType struct, } else if (field.equals(MetadataColumns.ROW_POSITION)) { this.isConstantOrMetadataField[pos] = true; this.readers[pos] = new RowPositionReader(); + } else if (field.equals(MetadataColumns.IS_DELETED)) { + this.isConstantOrMetadataField[pos] = true; + this.readers[pos] = constants(false); } else { this.readers[pos] = readers.get(readerIndex); readerIndex++; diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 710c771036d4..e0377050ba10 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -158,6 +158,9 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); types.add(null); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(ParquetValueReaders.constant(false)); + types.add(null); } else { ParquetValueReader reader = readersById.get(id); if (reader != null) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index ec91e32e9906..8abee4a575e1 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -166,6 +166,9 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { reorderedFields.add(ParquetValueReaders.position()); types.add(null); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(ParquetValueReaders.constant(false)); + types.add(null); } else { ParquetValueReader reader = readersById.get(id); if (reader != null) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index c3c54996136e..33e1dac1235e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -406,6 +406,8 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, idToConstant.get(field.fieldId()))); } else if (field.equals(MetadataColumns.ROW_POSITION)) { fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile)); + } else if (field.equals(MetadataColumns.IS_DELETED)) { + fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, false)); } else { fieldVectors.add(fieldConverters.get(vectorIndex) .convert(structVector.fields[vectorIndex], batchSize, batchOffsetInFile)); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java index 410a5f1bd0d7..010f9f5d077c 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java @@ -69,7 +69,8 @@ public class TestSparkOrcReadMetadataColumns { private static final Schema PROJECTION_SCHEMA = new Schema( required(100, "id", Types.LongType.get()), required(101, "data", Types.StringType.get()), - MetadataColumns.ROW_POSITION + MetadataColumns.ROW_POSITION, + MetadataColumns.IS_DELETED ); private static final int NUM_ROWS = 1000; @@ -91,6 +92,7 @@ public class TestSparkOrcReadMetadataColumns { row.update(0, i); row.update(1, UTF8String.fromString("str" + i)); row.update(2, i); + row.update(3, false); EXPECTED_ROWS.add(row); } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 8f15db724a3b..b68db024aab3 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -71,7 +71,8 @@ public class TestSparkParquetReadMetadataColumns { private static final Schema PROJECTION_SCHEMA = new Schema( required(100, "id", Types.LongType.get()), required(101, "data", Types.StringType.get()), - MetadataColumns.ROW_POSITION + MetadataColumns.ROW_POSITION, + MetadataColumns.IS_DELETED ); private static final int NUM_ROWS = 1000; @@ -104,6 +105,7 @@ public class TestSparkParquetReadMetadataColumns { } row.update(1, UTF8String.fromString("str" + i)); row.update(2, i); + row.update(3, false); EXPECTED_ROWS.add(row); } }