Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.RewriteDataFiles;
Expand Down Expand Up @@ -93,10 +91,6 @@ public DataFileRewritePlanner(
@Override
public void open(Configuration parameters) throws Exception {
tableLoader.open();
Table table = tableLoader.loadTable();
Preconditions.checkArgument(
!TableUtil.supportsRowLineage(table),
"Flink does not support compaction on row lineage enabled tables (V3+)");
this.errorCounter =
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex)
.counter(TableMaintenanceMetrics.ERROR_COUNTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
Expand Down Expand Up @@ -108,8 +112,10 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
value.group().rewrittenFiles().size());
}

try (TaskWriter<RowData> writer = writerFor(value)) {
try (DataIterator<RowData> iterator = readerFor(value)) {
boolean preserveRowId = TableUtil.supportsRowLineage(value.table());

try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
try (DataIterator<RowData> iterator = readerFor(value, preserveRowId)) {
while (iterator.hasNext()) {
writer.write(iterator.next());
}
Expand Down Expand Up @@ -169,30 +175,42 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
}
}

private TaskWriter<RowData> writerFor(PlannedGroup value) {
private TaskWriter<RowData> writerFor(PlannedGroup value, boolean preserveRowId) {
String formatString =
PropertyUtil.propertyAsString(
value.table().properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
Schema writeSchema =
preserveRowId
? MetadataColumns.schemaWithRowLineage(value.table().schema())
: value.table().schema();
RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema);
RowDataTaskWriterFactory factory =
new RowDataTaskWriterFactory(
value.table(),
FlinkSchemaUtil.convert(value.table().schema()),
value::table,
flinkWriteType,
value.group().inputSplitSize(),
FileFormat.fromString(formatString),
value.table().properties(),
null,
false);
false,
writeSchema,
value.table().spec());
factory.initialize(subTaskId, attemptId);
return factory.create();
}

private DataIterator<RowData> readerFor(PlannedGroup value) {
private DataIterator<RowData> readerFor(PlannedGroup value, boolean preserveRowId) {
Schema projectedSchema =
preserveRowId
? MetadataColumns.schemaWithRowLineage(value.table().schema())
: value.table().schema();

RowDataFileScanTaskReader reader =
new RowDataFileScanTaskReader(
value.table().schema(),
value.table().schema(),
projectedSchema,
PropertyUtil.propertyAsString(value.table().properties(), DEFAULT_NAME_MAPPING, null),
false,
Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
Expand Down Expand Up @@ -84,13 +83,7 @@ public RowDataFileScanTaskReader(
@Override
public CloseableIterator<RowData> open(
FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds());

Map<Integer, ?> idToConstant =
partitionSchema.columns().isEmpty()
? ImmutableMap.of()
: PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);

Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
FlinkDeleteFilter deletes =
new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor);
CloseableIterable<RowData> iterable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ private SimpleDataUtil() {}
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "extra", Types.StringType.get()));

public static final Schema SCHEMA3 =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "_row_id", Types.LongType.get()),
Types.NestedField.optional(4, "_last_updated_sequence_number", Types.LongType.get()));

public static final ResolvedSchema FLINK_SCHEMA =
ResolvedSchema.of(
Column.physical("id", DataTypes.INT()), Column.physical("data", DataTypes.STRING()));
Expand All @@ -100,6 +107,7 @@ private SimpleDataUtil() {}

public static final Record RECORD = GenericRecord.create(SCHEMA);
public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
public static final Record RECORD3 = GenericRecord.create(SCHEMA3);

public static Table createTable(
String path, Map<String, String> properties, boolean partitioned) {
Expand Down Expand Up @@ -127,6 +135,16 @@ public static Record createRecord(Integer id, String data, String extra) {
return record;
}

public static Record createRecordWithRowId(
Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) {
Record record = RECORD3.copy();
record.setField("id", id);
record.setField("data", data);
record.setField("_row_id", rowId);
record.setField("_last_updated_sequence_number", lastUpdatedSequenceNumber);
return record;
}

public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
Expand Down Expand Up @@ -348,6 +366,11 @@ public static void assertTableRecords(Table table, List<Record> expected) throws

public static void assertTableRecords(Table table, List<Record> expected, String branch)
throws IOException {
assertTableRecords(table, expected, branch, table.schema());
}

public static void assertTableRecords(
Table table, List<Record> expected, String branch, Schema projectSchema) throws IOException {
table.refresh();
Snapshot snapshot = latestSnapshot(table, branch);

Expand All @@ -360,12 +383,15 @@ public static void assertTableRecords(Table table, List<Record> expected, String
return;
}

Types.StructType type = table.schema().asStruct();
Types.StructType type = projectSchema.asStruct();
StructLikeSet expectedSet = StructLikeSet.create(type);
expectedSet.addAll(expected);

try (CloseableIterable<Record> iterable =
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
IcebergGenerics.read(table)
.useSnapshot(snapshot.snapshotId())
.project(projectSchema)
.build()) {
StructLikeSet actualSet = StructLikeSet.create(type);

for (Record record : iterable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.SimpleDataUtil;
Expand Down Expand Up @@ -79,6 +82,118 @@ void testRewriteUnpartitioned() throws Exception {
createRecord(4, "d")));
}

@Test
void testRewriteUnpartitionedPreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
insert(table, 2, "b");
insert(table, 3, "c");
insert(table, 4, "d");

assertFileNum(table, 4, 0);

appendRewriteDataFiles(
RewriteDataFiles.builder()
.parallelism(2)
.deleteFileThreshold(10)
.targetFileSizeBytes(1_000_000L)
.maxFileGroupSizeBytes(10_000_000L)
.maxFileSizeBytes(2_000_000L)
.minFileSizeBytes(500_000L)
.minInputFiles(2)
.partialProgressEnabled(true)
.partialProgressMaxCommits(1)
.maxRewriteBytes(100_000L)
.rewriteAll(false));

runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());

assertFileNum(table, 1, 0);

Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
SimpleDataUtil.assertTableRecords(
table,
ImmutableList.of(
SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)),
SnapshotRef.MAIN_BRANCH,
schema);
}

@Test
void testRewriteTheSameFilePreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
insert(table, 2, "b");
// Create a file with two lines of data to verify that the rowid is read correctly.
insert(
table,
ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), SimpleDataUtil.createRecord(4, "d")));

assertFileNum(table, 3, 0);

appendRewriteDataFiles(
RewriteDataFiles.builder()
.parallelism(2)
.deleteFileThreshold(10)
.targetFileSizeBytes(1_000_000L)
.maxFileGroupSizeBytes(10_000_000L)
.maxFileSizeBytes(2_000_000L)
.minFileSizeBytes(500_000L)
.minInputFiles(2)
.partialProgressEnabled(true)
.partialProgressMaxCommits(1)
.maxRewriteBytes(100_000L)
.rewriteAll(false));

runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());

assertFileNum(table, 1, 0);

Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
SimpleDataUtil.assertTableRecords(
table,
ImmutableList.of(
SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
// The Ids 3 and 4 come from the same file, so the last updated sequence number should
// be the same.
SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)),
SnapshotRef.MAIN_BRANCH,
schema);
}

@Test
void testRewritePartitionedPreserveLineage() throws Exception {
Table table = createPartitionedTable(3);
insertPartitioned(table, 1, "p1");
insertPartitioned(table, 2, "p1");
insertPartitioned(table, 3, "p2");
insertPartitioned(table, 4, "p2");

assertFileNum(table, 4, 0);

appendRewriteDataFiles();

runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());

assertFileNum(table, 2, 0);

Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
SimpleDataUtil.assertTableRecords(
table,
ImmutableList.of(
SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L),
SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L),
SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L),
SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)),
SnapshotRef.MAIN_BRANCH,
schema);
}

@Test
void testRewritePartitioned() throws Exception {
Table table = createPartitionedTable();
Expand Down
Loading