Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.table.data.RowData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR needs to go against the 2.1 target (currently: 2.0), as #13714 has been merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we focus on the 2.0.0 version for this PR first, and then backport it to 2.1.0 and 1.20 later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with both.

import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
Expand Down Expand Up @@ -90,10 +89,13 @@ public ParquetValueReader<RowData> message(
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public ParquetValueReader<RowData> struct(
Types.StructType expected, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {

if (null == expected) {
return new RowDataReader(ImmutableList.of());
}

// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
Expand All @@ -102,51 +104,39 @@ public ParquetValueReader<RowData> struct(
if (fieldType.getId() != null) {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}
}
}

List<Types.NestedField> expectedFields =
expected != null ? expected.fields() : ImmutableList.of();
int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
List<Types.NestedField> expectedFields = expected.fields();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
} else if (reader != null) {
reorderedFields.add(reader);
} else if (field.initialDefault() != null) {
reorderedFields.add(
ParquetValueReaders.constant(
RowDataUtil.convertConstant(field.type(), field.initialDefault()),
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
} else if (field.isOptional()) {
reorderedFields.add(ParquetValueReaders.nulls());
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
ParquetValueReader<?> reader =
ParquetValueReaders.replaceWithMetadataReader(
id, readersById.get(id), idToConstant, constantDefinitionLevel);
reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
Comment on lines -121 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, how did you decide to make this refactoring? You consolidate several branches of the if statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #12836, the core layer integrated the same code between Spark and Core. I noticed that the code for Flink is also the same, so I reused it.

}

return new RowDataReader(reorderedFields);
}

private ParquetValueReader<?> defaultReader(
Types.NestedField field, ParquetValueReader<?> reader, int constantDL) {
if (reader != null) {
return reader;
} else if (field.initialDefault() != null) {
return ParquetValueReaders.constant(
RowDataUtil.convertConstant(field.type(), field.initialDefault()), constantDL);
} else if (field.isOptional()) {
return ParquetValueReaders.nulls();
}

throw new IllegalArgumentException(String.format("Missing required field: %s", field.name()));
}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericDataUtil;
Expand Down Expand Up @@ -176,6 +177,16 @@ public static void assertRowData(
LogicalType rowType,
StructLike expectedRecord,
RowData actualRowData) {
assertRowData(structType, rowType, expectedRecord, actualRowData, null, -1);
}

public static void assertRowData(
Types.StructType structType,
LogicalType rowType,
StructLike expectedRecord,
RowData actualRowData,
Map<Integer, Object> idToConstant,
int rowPosition) {
if (expectedRecord == null && actualRowData == null) {
return;
}
Expand All @@ -197,21 +208,14 @@ public static void assertRowData(
LogicalType logicalType = ((RowType) rowType).getTypeAt(pos);
Object actualValue =
FlinkRowData.createFieldGetter(logicalType, pos).getFieldOrNull(actualRowData);
if (expectedField != null) {
assertEquals(
field.type(), logicalType, expected.getField(expectedField.name()), actualValue);
} else {
// convert the initial value to generic because that is the data model used to generate
// the expected records
assertEquals(
field.type(),
logicalType,
GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()),
actualValue);
}
pos += 1;
}
Object expectedValue =
expectedField != null
? getExpectedValue(idToConstant, rowPosition, expectedField, expected)
: GenericDataUtil.internalToGeneric(field.type(), field.initialDefault());

assertEquals(field.type(), logicalType, expectedValue, actualValue);
pos++;
}
} else {
for (int i = 0; i < types.size(); i += 1) {
LogicalType logicalType = ((RowType) rowType).getTypeAt(i);
Expand All @@ -223,6 +227,30 @@ public static void assertRowData(
}
}

private static Object getExpectedValue(
Map<Integer, Object> idToConstant,
int pos,
Types.NestedField expectedField,
Record expected) {
Object expectedValue;
int id = expectedField.fieldId();
if (id == MetadataColumns.ROW_ID.fieldId()) {
expectedValue = expected.getField(expectedField.name());
if (expectedValue == null && idToConstant != null) {
expectedValue = (Long) idToConstant.get(id) + pos;
}
} else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
expectedValue = expected.getField(expectedField.name());
if (expectedValue == null && idToConstant != null) {
expectedValue = idToConstant.get(id);
}
} else {
expectedValue = expected.getField(expectedField.name());
}

return expectedValue;
}

private static void assertEquals(
Type type, LogicalType logicalType, Object expected, Object actual) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -74,6 +76,11 @@ protected boolean supportsTimestampNanos() {
return true;
}

@Override
protected boolean supportsRowLineage() {
return true;
}

@Test
public void testBuildReader() {
MessageType fileSchema =
Expand Down Expand Up @@ -216,29 +223,31 @@ public void testTwoLevelList() throws IOException {

private void writeAndValidate(
Iterable<Record> iterable, Schema writeSchema, Schema expectedSchema) throws IOException {
File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).isTrue();

OutputFile output = new InMemoryOutputFile();
try (FileAppender<Record> writer =
Parquet.write(Files.localOutput(testFile))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for moving from local file to in memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #12836, spark and Core have changed their behavior, so I aligned this part.

Parquet.write(output)
.schema(writeSchema)
.createWriterFunc(GenericParquetWriter::create)
.build()) {
writer.addAll(iterable);
}

try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
Parquet.read(output.toInputFile())
.project(expectedSchema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(expectedSchema, type))
.createReaderFunc(
type -> FlinkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT))
.build()) {
Iterator<Record> expected = iterable.iterator();
Iterator<RowData> rows = reader.iterator();
LogicalType rowType = FlinkSchemaUtil.convert(writeSchema);
for (int i = 0; i < NUM_RECORDS; i += 1) {
int pos = 0;
for (Record record : iterable) {
assertThat(rows).hasNext();
TestHelpers.assertRowData(writeSchema.asStruct(), rowType, expected.next(), rows.next());
TestHelpers.assertRowData(
writeSchema.asStruct(), rowType, record, rows.next(), ID_TO_CONSTANT, pos++);
}

assertThat(rows).isExhausted();
}
}
Expand Down