Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public VectorizedReader<?> message(
reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(idToConstant.get(id)));
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(VectorizedArrowReader.positions());
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(new VectorizedArrowReader.ConstantVectorReader<>(false));
} else if (reader != null) {
reorderedFields.add(reader);
} else {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private MetadataColumns() {
Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
public static final NestedField ROW_POSITION = NestedField.required(
Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
public static final NestedField IS_DELETED = NestedField.required(
Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted");

// IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
public static final NestedField DELETE_FILE_PATH = NestedField.required(
Expand All @@ -47,7 +49,8 @@ private MetadataColumns() {

private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
ROW_POSITION.name(), ROW_POSITION);
ROW_POSITION.name(), ROW_POSITION,
IS_DELETED.name(), IS_DELETED);

private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId)
.collect(ImmutableSet.toImmutableSet());
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,26 @@ public abstract static class StructReader<S> implements ValueReader<S>, Supports

protected StructReader(List<ValueReader<?>> readers, Schema schema) {
this.readers = readers.toArray(new ValueReader[0]);
this.positions = new int[0];
this.constants = new Object[0];
Integer isDeletedColumnPos = null;

List<Schema.Field> fields = schema.getFields();
for (int pos = 0; pos < fields.size(); pos += 1) {
Schema.Field field = fields.get(pos);
if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.ROW_POSITION.fieldId()) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
} else if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.IS_DELETED.fieldId()) {
isDeletedColumnPos = pos;
}
}

if (isDeletedColumnPos == null) {
this.positions = new int[0];
this.constants = new Object[0];
} else {
this.positions = new int[]{isDeletedColumnPos};
this.constants = new Object[]{false};
}
}

protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) {
Expand All @@ -609,6 +618,9 @@ protected StructReader(List<ValueReader<?>> readers, Types.StructType struct, Ma
} else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
// track where the _pos field is located for setRowPositionSupplier
this.posField = pos;
} else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) {
positionList.add(pos);
constantList.add(false);
}
}

Expand Down
10 changes: 8 additions & 2 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
requiredIds.addAll(eqDelete.equalityFieldIds());
}

requiredIds.add(MetadataColumns.IS_DELETED.fieldId());

Set<Integer> missingIds = Sets.newLinkedHashSet(
Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));

Expand All @@ -253,8 +255,8 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
// TODO: support adding nested columns. this will currently fail when finding nested columns to add
List<Types.NestedField> columns = Lists.newArrayList(requestedSchema.columns());
for (int fieldId : missingIds) {
if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
continue; // add _pos at the end
if (fieldId == MetadataColumns.ROW_POSITION.fieldId() || fieldId == MetadataColumns.IS_DELETED.fieldId()) {
continue; // add _pos and _deleted at the end
}

Types.NestedField field = tableSchema.asStruct().field(fieldId);
Expand All @@ -267,6 +269,10 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
columns.add(MetadataColumns.ROW_POSITION);
}

if (missingIds.contains(MetadataColumns.IS_DELETED.fieldId())) {
columns.add(MetadataColumns.IS_DELETED);
}

return new Schema(columns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType s
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
types.add(null);
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
Expand Down
15 changes: 12 additions & 3 deletions flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -191,9 +190,19 @@ public static void assertTableRows(Table table, List<RowData> expected) throws I

public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
table.refresh();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change? is it because two sets don't match after we add the additional metadata column? but wouldn't _pos and _file also have this issue?

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the metadata column projection logic produces additional columns even when the requestedSchema doesn't contain them and that is why we use StructLikeSet. The _pos column shows up only when positional deletes exist, the _deleted marker shows up when any of the deletes exist.

The failed unit test contains only equality delete which produces only _deleted column, so it failed with HashMultiSet comparison. But when the unit test, for example TestIcebergFilesCommitter.TestCommitTwoCheckpointsInSingleTxn, contains a positional delete, the unit test fails as well due to it has the additional column _pos. The following patch for the unit test could test it.

-      DeleteFile deleteFile1 = writeEqDeleteFile(appenderFactory, "delete-file-1", ImmutableList.of(delete3));
+      DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory,
+          "pos-delete-file-1",
+          ImmutableList.of(Pair.of(dataFile1.path(), 3L)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also find this suspicious. Is the extra column in the expected records or the table? I don't think that this PR should change the data produced by IcebergGenerics.read(table).build().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in expected records. The extra column is added in DeleteFileter#fileProjection.

private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
                                       List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
    ...
    // We add it to requiredIds, so that it exists in missingIds when requestedSchema doesn't contain it.
    if (!posDeletes.isEmpty()) {
      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
    }
    ....


    // We append it at the end anyway.
    if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
      columns.add(MetadataColumns.ROW_POSITION);
    }

    return new Schema(columns);
  }

Types.StructType type = table.schema().asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);

try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
Assert.assertEquals("Should produce the expected record",
HashMultiset.create(expected), HashMultiset.create(iterable));
StructLikeSet actualSet = StructLikeSet.create(type);

for (Record record : iterable) {
actualSet.add(record);
}

Assert.assertEquals("Should produce the expected record", expectedSet, actualSet);
}
}

Expand Down
3 changes: 3 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ protected StructReader(List<OrcValueReader<?>> readers, Types.StructType struct,
} else if (field.equals(MetadataColumns.ROW_POSITION)) {
this.isConstantOrMetadataField[pos] = true;
this.readers[pos] = new RowPositionReader();
} else if (field.equals(MetadataColumns.IS_DELETED)) {
this.isConstantOrMetadataField[pos] = true;
this.readers[pos] = constants(false);
} else {
this.readers[pos] = readers.get(readerIndex);
readerIndex++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
types.add(null);
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
types.add(null);
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v
fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, idToConstant.get(field.fieldId())));
} else if (field.equals(MetadataColumns.ROW_POSITION)) {
fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile));
} else if (field.equals(MetadataColumns.IS_DELETED)) {
fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, false));
} else {
fieldVectors.add(fieldConverters.get(vectorIndex)
.convert(structVector.fields[vectorIndex], batchSize, batchOffsetInFile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class TestSparkOrcReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
MetadataColumns.ROW_POSITION
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED
);

private static final int NUM_ROWS = 1000;
Expand All @@ -91,6 +92,7 @@ public class TestSparkOrcReadMetadataColumns {
row.update(0, i);
row.update(1, UTF8String.fromString("str" + i));
row.update(2, i);
row.update(3, false);
EXPECTED_ROWS.add(row);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class TestSparkParquetReadMetadataColumns {
private static final Schema PROJECTION_SCHEMA = new Schema(
required(100, "id", Types.LongType.get()),
required(101, "data", Types.StringType.get()),
MetadataColumns.ROW_POSITION
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED
);

private static final int NUM_ROWS = 1000;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class TestSparkParquetReadMetadataColumns {
}
row.update(1, UTF8String.fromString("str" + i));
row.update(2, i);
row.update(3, false);
EXPECTED_ROWS.add(row);
}
}
Expand Down