Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
Expand Down Expand Up @@ -90,10 +89,13 @@ public ParquetValueReader<RowData> message(
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public ParquetValueReader<RowData> struct(
Types.StructType expected, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {

if (null == expected) {
return new RowDataReader(ImmutableList.of());
}

// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
Expand All @@ -102,51 +104,39 @@ public ParquetValueReader<RowData> struct(
if (fieldType.getId() != null) {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}
}
}

List<Types.NestedField> expectedFields =
expected != null ? expected.fields() : ImmutableList.of();
int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
List<Types.NestedField> expectedFields = expected.fields();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
} else if (reader != null) {
reorderedFields.add(reader);
} else if (field.initialDefault() != null) {
reorderedFields.add(
ParquetValueReaders.constant(
RowDataUtil.convertConstant(field.type(), field.initialDefault()),
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
} else if (field.isOptional()) {
reorderedFields.add(ParquetValueReaders.nulls());
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
ParquetValueReader<?> reader =
ParquetValueReaders.replaceWithMetadataReader(
id, readersById.get(id), idToConstant, constantDefinitionLevel);
reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
}

return new RowDataReader(reorderedFields);
}

private ParquetValueReader<?> defaultReader(
Types.NestedField field, ParquetValueReader<?> reader, int constantDL) {
if (reader != null) {
return reader;
} else if (field.initialDefault() != null) {
return ParquetValueReaders.constant(
RowDataUtil.convertConstant(field.type(), field.initialDefault()), constantDL);
} else if (field.isOptional()) {
return ParquetValueReaders.nulls();
}

throw new IllegalArgumentException(String.format("Missing required field: %s", field.name()));
}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
Expand Down Expand Up @@ -93,10 +91,6 @@ public DataFileRewritePlanner(
@Override
public void open(OpenContext context) throws Exception {
tableLoader.open();
Table table = tableLoader.loadTable();
Preconditions.checkArgument(
!TableUtil.supportsRowLineage(table),
"Flink does not support compaction on row lineage enabled tables (V3+)");
this.errorCounter =
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex)
.counter(TableMaintenanceMetrics.ERROR_COUNTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
Expand Down Expand Up @@ -108,8 +112,10 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
value.group().rewrittenFiles().size());
}

try (TaskWriter<RowData> writer = writerFor(value)) {
try (DataIterator<RowData> iterator = readerFor(value)) {
boolean preserveRowId = TableUtil.supportsRowLineage(value.table());

try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
try (DataIterator<RowData> iterator = readerFor(value, preserveRowId)) {
while (iterator.hasNext()) {
writer.write(iterator.next());
}
Expand Down Expand Up @@ -169,30 +175,42 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
}
}

private TaskWriter<RowData> writerFor(PlannedGroup value) {
private TaskWriter<RowData> writerFor(PlannedGroup value, boolean preserveRowId) {
String formatString =
PropertyUtil.propertyAsString(
value.table().properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
Schema writeSchema =
preserveRowId
? MetadataColumns.schemaWithRowLineage(value.table().schema())
: value.table().schema();
RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema);
RowDataTaskWriterFactory factory =
new RowDataTaskWriterFactory(
value.table(),
FlinkSchemaUtil.convert(value.table().schema()),
value::table,
flinkWriteType,
value.group().inputSplitSize(),
FileFormat.fromString(formatString),
value.table().properties(),
null,
false);
false,
writeSchema,
value.table().spec());
factory.initialize(subTaskId, attemptId);
return factory.create();
}

private DataIterator<RowData> readerFor(PlannedGroup value) {
private DataIterator<RowData> readerFor(PlannedGroup value, boolean preserveRowId) {
Schema projectedSchema =
preserveRowId
? MetadataColumns.schemaWithRowLineage(value.table().schema())
: value.table().schema();

RowDataFileScanTaskReader reader =
new RowDataFileScanTaskReader(
value.table().schema(),
value.table().schema(),
projectedSchema,
PropertyUtil.propertyAsString(value.table().properties(), DEFAULT_NAME_MAPPING, null),
false,
Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
Expand Down Expand Up @@ -84,13 +83,7 @@ public RowDataFileScanTaskReader(
@Override
public CloseableIterator<RowData> open(
FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds());

Map<Integer, ?> idToConstant =
partitionSchema.columns().isEmpty()
? ImmutableMap.of()
: PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);

Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
FlinkDeleteFilter deletes =
new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor);
CloseableIterable<RowData> iterable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ private SimpleDataUtil() {}
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "extra", Types.StringType.get()));

public static final Schema SCHEMA3 =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "_row_id", Types.LongType.get()),
Types.NestedField.optional(4, "_last_updated_sequence_number", Types.LongType.get()));

public static final ResolvedSchema FLINK_SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT()), Column.physical("data", DataTypes.STRING()));
Expand All @@ -101,6 +108,7 @@ private SimpleDataUtil() {}

public static final Record RECORD = GenericRecord.create(SCHEMA);
public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
public static final Record RECORD3 = GenericRecord.create(SCHEMA3);

public static Table createTable(
String path, Map<String, String> properties, boolean partitioned) {
Expand Down Expand Up @@ -128,6 +136,16 @@ public static Record createRecord(Integer id, String data, String extra) {
return record;
}

public static Record createRecordWithRowId(
Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) {
Record record = RECORD3.copy();
record.setField("id", id);
record.setField("data", data);
record.setField("_row_id", rowId);
record.setField("_last_updated_sequence_number", lastUpdatedSequenceNumber);
return record;
}

public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
Expand Down Expand Up @@ -319,6 +337,11 @@ public static void assertTableRecords(Table table, List<Record> expected) throws

public static void assertTableRecords(Table table, List<Record> expected, String branch)
throws IOException {
assertTableRecords(table, expected, branch, table.schema());
}

public static void assertTableRecords(
Table table, List<Record> expected, String branch, Schema projectSchema) throws IOException {
table.refresh();
Snapshot snapshot = latestSnapshot(table, branch);

Expand All @@ -327,12 +350,15 @@ public static void assertTableRecords(Table table, List<Record> expected, String
return;
}

Types.StructType type = table.schema().asStruct();
Types.StructType type = projectSchema.asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);

try (CloseableIterable<Record> iterable =
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
IcebergGenerics.read(table)
.useSnapshot(snapshot.snapshotId())
.project(projectSchema)
.build()) {
StructLikeSet actualSet = StructLikeSet.create(type);

for (Record record : iterable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.flink.types.Row;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericDataUtil;
Expand Down Expand Up @@ -176,6 +177,16 @@ public static void assertRowData(
LogicalType rowType,
StructLike expectedRecord,
RowData actualRowData) {
assertRowData(structType, rowType, expectedRecord, actualRowData, null, -1);
}

public static void assertRowData(
Types.StructType structType,
LogicalType rowType,
StructLike expectedRecord,
RowData actualRowData,
Map<Integer, Object> idToConstant,
int rowPosition) {
if (expectedRecord == null && actualRowData == null) {
return;
}
Expand All @@ -197,21 +208,18 @@ public static void assertRowData(
LogicalType logicalType = ((RowType) rowType).getTypeAt(pos);
Object actualValue =
FlinkRowData.createFieldGetter(logicalType, pos).getFieldOrNull(actualRowData);
Object expectedValue;
if (expectedField != null) {
assertEquals(
field.type(), logicalType, expected.getField(expectedField.name()), actualValue);
expectedValue = getExpectedValue(idToConstant, rowPosition, expectedField, expected);
} else {
// convert the initial value to generic because that is the data model used to generate
// the expected records
assertEquals(
field.type(),
logicalType,
GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()),
actualValue);
expectedValue = GenericDataUtil.internalToGeneric(field.type(), field.initialDefault());
}
pos += 1;
}

assertEquals(field.type(), logicalType, expectedValue, actualValue);
pos++;
}
} else {
for (int i = 0; i < types.size(); i += 1) {
LogicalType logicalType = ((RowType) rowType).getTypeAt(i);
Expand All @@ -223,6 +231,30 @@ public static void assertRowData(
}
}

private static Object getExpectedValue(
Map<Integer, Object> idToConstant,
int pos,
Types.NestedField expectedField,
Record expected) {
Object expectedValue;
int id = expectedField.fieldId();
if (id == MetadataColumns.ROW_ID.fieldId()) {
expectedValue = expected.getField(expectedField.name());
if (expectedValue == null && idToConstant != null) {
expectedValue = (Long) idToConstant.get(id) + pos;
}
} else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
expectedValue = expected.getField(expectedField.name());
if (expectedValue == null && idToConstant != null) {
expectedValue = idToConstant.get(id);
}
} else {
expectedValue = expected.getField(expectedField.name());
}

return expectedValue;
}

private static void assertEquals(
Type type, LogicalType logicalType, Object expected, Object actual) {

Expand Down
Loading