Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -55,23 +56,22 @@ public class FlinkAppenderFactory implements FileAppenderFactory<RowData>, Seria
private final int[] equalityFieldIds;
private final Schema eqDeleteRowSchema;
private final Schema posDeleteRowSchema;
private final Table table;

private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;

public FlinkAppenderFactory(
Schema schema, RowType flinkSchema, Map<String, String> props, PartitionSpec spec) {
this(schema, flinkSchema, props, spec, null, null, null);
}

public FlinkAppenderFactory(
Table table,
Schema schema,
RowType flinkSchema,
Map<String, String> props,
PartitionSpec spec,
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
Preconditions.checkNotNull(table, "Table shouldn't be null");
this.table = table;
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
Expand Down Expand Up @@ -99,7 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -160,7 +160,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -216,7 +216,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
@Override
public PositionDeleteWriter<RowData> newPosDeleteWriter(
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
try {
switch (format) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
this.appenderFactory =
new FlinkAppenderFactory(
table, schema, flinkSchema, writeProperties, spec, null, null, null);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory(
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand All @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory(
} else {
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) {
}

public static DataFile writeFile(
Table table,
Schema schema,
PartitionSpec spec,
Configuration conf,
Expand All @@ -138,7 +139,8 @@ public static DataFile writeFile(

RowType flinkSchema = FlinkSchemaUtil.convert(schema);
FileAppenderFactory<RowData> appenderFactory =
new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec);
new FlinkAppenderFactory(
table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null);

FileAppender<RowData> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat);
try (FileAppender<RowData> closeableAppender = appender) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.iceberg.util.StructLikeSet;

public class TestFlinkAppenderFactory extends TestAppenderFactory<RowData> {

private final RowType rowType;

public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) {
Expand All @@ -43,6 +42,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) {
protected FileAppenderFactory<RowData> createAppenderFactory(
List<Integer> equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) {
return new FlinkAppenderFactory(
table,
table.schema(),
rowType,
table.properties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void before() throws IOException {
};
this.appenderFactory =
new FlinkAppenderFactory(
table,
table.schema(),
FlinkSchemaUtil.convert(table.schema()),
table.properties(),
Expand Down Expand Up @@ -255,6 +256,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti

private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(
table,
table.schema(),
table.spec(),
CONF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId()
};
return new FlinkAppenderFactory(
table,
table.schema(),
FlinkSchemaUtil.convert(table.schema()),
table.properties(),
Expand Down Expand Up @@ -813,7 +814,7 @@ private List<Path> assertFlinkManifests(int expectedCount) throws IOException {

private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(
table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows);
}

private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,23 @@
import org.apache.iceberg.TestMergingMetrics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public class TestFlinkMergingMetrics extends TestMergingMetrics<RowData> {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Rule
public final HadoopTableResource tableResource =
new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA);

public TestFlinkMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
}
Expand All @@ -44,7 +54,14 @@ protected FileAppender<RowData> writeAndGetAppender(List<Record> records) throws

FileAppender<RowData> appender =
new FlinkAppenderFactory(
SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned())
tableResource.table(),
SCHEMA,
flinkSchema,
ImmutableMap.of(),
PartitionSpec.unpartitioned(),
null,
null,
null)
.newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender<RowData> fileAppender = appender) {
records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -55,23 +56,22 @@ public class FlinkAppenderFactory implements FileAppenderFactory<RowData>, Seria
private final int[] equalityFieldIds;
private final Schema eqDeleteRowSchema;
private final Schema posDeleteRowSchema;
private final Table table;

private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;

public FlinkAppenderFactory(
Schema schema, RowType flinkSchema, Map<String, String> props, PartitionSpec spec) {
this(schema, flinkSchema, props, spec, null, null, null);
}

public FlinkAppenderFactory(
Table table,
Schema schema,
RowType flinkSchema,
Map<String, String> props,
PartitionSpec spec,
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
Preconditions.checkNotNull(table, "Table shouldn't be null");
this.table = table;
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
Expand Down Expand Up @@ -99,7 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -160,7 +160,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -216,7 +216,7 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
@Override
public PositionDeleteWriter<RowData> newPosDeleteWriter(
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
try {
switch (format) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
this.appenderFactory =
new FlinkAppenderFactory(
table, schema, flinkSchema, writeProperties, spec, null, null, null);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory(
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand All @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory(
} else {
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) {
}

public static DataFile writeFile(
Table table,
Schema schema,
PartitionSpec spec,
Configuration conf,
Expand All @@ -138,7 +139,8 @@ public static DataFile writeFile(

RowType flinkSchema = FlinkSchemaUtil.convert(schema);
FileAppenderFactory<RowData> appenderFactory =
new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec);
new FlinkAppenderFactory(
table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null);

FileAppender<RowData> appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat);
try (FileAppender<RowData> closeableAppender = appender) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) {
protected FileAppenderFactory<RowData> createAppenderFactory(
List<Integer> equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) {
return new FlinkAppenderFactory(
table,
table.schema(),
rowType,
table.properties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void before() throws IOException {
};
this.appenderFactory =
new FlinkAppenderFactory(
table,
table.schema(),
FlinkSchemaUtil.convert(table.schema()),
table.properties(),
Expand Down Expand Up @@ -254,6 +255,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti

private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(
table,
table.schema(),
table.spec(),
CONF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ private FileAppenderFactory<RowData> createDeletableAppenderFactory() {
table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId()
};
return new FlinkAppenderFactory(
table,
table.schema(),
FlinkSchemaUtil.convert(table.schema()),
table.properties(),
Expand Down Expand Up @@ -808,7 +809,13 @@ private List<Path> assertFlinkManifests(int expectedCount) throws IOException {

private DataFile writeDataFile(String filename, List<RowData> rows) throws IOException {
return SimpleDataUtil.writeFile(
table.schema(), table.spec(), CONF, table.location(), format.addExtension(filename), rows);
table,
table.schema(),
table.spec(),
CONF,
table.location(),
format.addExtension(filename),
rows);
}

private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@
import org.apache.iceberg.TestMergingMetrics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public class TestFlinkMergingMetrics extends TestMergingMetrics<RowData> {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Rule
public final HadoopTableResource tableResource =
new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA);

public TestFlinkMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
Expand All @@ -44,7 +53,14 @@ protected FileAppender<RowData> writeAndGetAppender(List<Record> records) throws

FileAppender<RowData> appender =
new FlinkAppenderFactory(
SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned())
tableResource.table(),
SCHEMA,
flinkSchema,
ImmutableMap.of(),
PartitionSpec.unpartitioned(),
null,
null,
null)
.newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat);
try (FileAppender<RowData> fileAppender = appender) {
records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add);
Expand Down
Loading