diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 85289eceff15..b6f1392d1562 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -55,16 +56,13 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final int[] equalityFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Table table; private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory( - Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); - } - - public FlinkAppenderFactory( + Table table, Schema schema, RowType flinkSchema, Map props, @@ -72,6 +70,8 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + Preconditions.checkNotNull(table, "Table shouldn't be null"); + this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -99,7 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -160,7 +160,7 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -216,7 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { switch (format) { case AVRO: diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..c624eb3f0276 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); + this.appenderFactory = + new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, spec, null, null, null); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory( // that are correct for the deleted row. Therefore, only write the equality delete fields. this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory( } else { this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 7a7bfdfff3df..e50123ab9137 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) { } public static DataFile writeFile( + Table table, Schema schema, PartitionSpec spec, Configuration conf, @@ -138,7 +139,8 @@ public static DataFile writeFile( RowType flinkSchema = FlinkSchemaUtil.convert(schema); FileAppenderFactory appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); + new FlinkAppenderFactory( + table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..282f993824bc 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -31,7 +31,6 @@ import org.apache.iceberg.util.StructLikeSet; public class TestFlinkAppenderFactory extends TestAppenderFactory { - private final RowType rowType; public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { @@ -43,6 +42,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { protected FileAppenderFactory createAppenderFactory( List equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) { return new FlinkAppenderFactory( + table, table.schema(), rowType, table.properties(), diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 7fe4e159fc61..5528e71b3db1 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -79,6 +79,7 @@ public void before() throws IOException { }; this.appenderFactory = new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -255,6 +256,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( + table, table.schema(), table.spec(), CONF, diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index e27b9093f19d..ca643ad53229 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -771,6 +771,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId() }; return new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -813,7 +814,7 @@ private List assertFlinkManifests(int expectedCount) throws IOException { private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( - table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows); + table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows); } private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 3a7ec96cb1d6..291d66538f6b 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -27,13 +27,23 @@ import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA); + public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); } @@ -44,7 +54,14 @@ protected FileAppender writeAndGetAppender(List records) throws FileAppender appender = new FlinkAppenderFactory( - SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + tableResource.table(), + SCHEMA, + flinkSchema, + ImmutableMap.of(), + PartitionSpec.unpartitioned(), + null, + null, + null) .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 85289eceff15..b6f1392d1562 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -55,16 +56,13 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final int[] equalityFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Table table; private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory( - Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); - } - - public FlinkAppenderFactory( + Table table, Schema schema, RowType flinkSchema, Map props, @@ -72,6 +70,8 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + Preconditions.checkNotNull(table, "Table shouldn't be null"); + this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -99,7 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -160,7 +160,7 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -216,7 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { switch (format) { case AVRO: diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..c624eb3f0276 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); + this.appenderFactory = + new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, spec, null, null, null); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory( // that are correct for the deleted row. Therefore, only write the equality delete fields. this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory( } else { this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index f5b60c63d708..1bd9c34e1c06 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) { } public static DataFile writeFile( + Table table, Schema schema, PartitionSpec spec, Configuration conf, @@ -138,7 +139,8 @@ public static DataFile writeFile( RowType flinkSchema = FlinkSchemaUtil.convert(schema); FileAppenderFactory appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); + new FlinkAppenderFactory( + table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..d25b2792ac65 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -43,6 +43,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { protected FileAppenderFactory createAppenderFactory( List equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) { return new FlinkAppenderFactory( + table, table.schema(), rowType, table.properties(), diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 27dc665055cc..36801420106e 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -78,6 +78,7 @@ public void before() throws IOException { }; this.appenderFactory = new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -254,6 +255,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( + table, table.schema(), table.spec(), CONF, diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 6fc4a5639fcf..5a4cf6223c26 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -766,6 +766,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId() }; return new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -808,7 +809,13 @@ private List assertFlinkManifests(int expectedCount) throws IOException { private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( - table.schema(), table.spec(), CONF, table.location(), format.addExtension(filename), rows); + table, + table.schema(), + table.spec(), + CONF, + table.location(), + format.addExtension(filename), + rows); } private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 3a7ec96cb1d6..30ed8a9742e1 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -27,12 +27,21 @@ import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA); public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); @@ -44,7 +53,14 @@ protected FileAppender writeAndGetAppender(List records) throws FileAppender appender = new FlinkAppenderFactory( - SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + tableResource.table(), + SCHEMA, + flinkSchema, + ImmutableMap.of(), + PartitionSpec.unpartitioned(), + null, + null, + null) .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 85289eceff15..b6f1392d1562 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -55,16 +56,13 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final int[] equalityFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Table table; private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory( - Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); - } - - public FlinkAppenderFactory( + Table table, Schema schema, RowType flinkSchema, Map props, @@ -72,6 +70,8 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + Preconditions.checkNotNull(table, "Table shouldn't be null"); + this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -99,7 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -160,7 +160,7 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -216,7 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { switch (format) { case AVRO: diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..c624eb3f0276 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); + this.appenderFactory = + new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, spec, null, null, null); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory( // that are correct for the deleted row. Therefore, only write the equality delete fields. this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory( } else { this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index f5b60c63d708..1bd9c34e1c06 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) { } public static DataFile writeFile( + Table table, Schema schema, PartitionSpec spec, Configuration conf, @@ -138,7 +139,8 @@ public static DataFile writeFile( RowType flinkSchema = FlinkSchemaUtil.convert(schema); FileAppenderFactory appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); + new FlinkAppenderFactory( + table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..d25b2792ac65 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -43,6 +43,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { protected FileAppenderFactory createAppenderFactory( List equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) { return new FlinkAppenderFactory( + table, table.schema(), rowType, table.properties(), diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 27dc665055cc..36801420106e 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -78,6 +78,7 @@ public void before() throws IOException { }; this.appenderFactory = new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -254,6 +255,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( + table, table.schema(), table.spec(), CONF, diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 6fc4a5639fcf..5a4cf6223c26 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -766,6 +766,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId() }; return new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -808,7 +809,13 @@ private List assertFlinkManifests(int expectedCount) throws IOException { private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( - table.schema(), table.spec(), CONF, table.location(), format.addExtension(filename), rows); + table, + table.schema(), + table.spec(), + CONF, + table.location(), + format.addExtension(filename), + rows); } private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 3a7ec96cb1d6..1d52acb2fe7b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -27,13 +27,23 @@ import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource(TEMP_FOLDER, "test_db", "test_table", SCHEMA); + public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); } @@ -44,7 +54,14 @@ protected FileAppender writeAndGetAppender(List records) throws FileAppender appender = new FlinkAppenderFactory( - SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + tableResource.table(), + SCHEMA, + flinkSchema, + ImmutableMap.of(), + PartitionSpec.unpartitioned(), + null, + null, + null) .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add);