From cbb3fb9505310f8206b5438b9ee19862e8d268db Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Wed, 30 Nov 2022 10:09:35 +0800 Subject: [PATCH 01/10] Flink: use correct metric config for position deletes --- .palantir/revapi.yml | 6 +++++ build.gradle | 2 +- .../org/apache/iceberg/MetricsConfig.java | 24 +++++++++++++++++++ .../flink/sink/FlinkAppenderFactory.java | 2 +- .../flink/sink/FlinkAppenderFactory.java | 2 +- .../flink/sink/FlinkAppenderFactory.java | 2 +- 6 files changed, 34 insertions(+), 4 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a4a7559b8e71..ea2a6b88cbfd 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -65,6 +65,12 @@ acceptedBreaks: - code: "java.method.removed" old: "method void org.apache.iceberg.io.DataWriter::add(T)" justification: "Removing deprecated method" + "1.1.0": + org.apache.iceberg:iceberg-core: + - code: "java.method.addedToInterface" + new : "method org.apache.iceberg.MetricsConfig org.apache.iceberg.MetricsConfig::forPositionDelete(\ + java.util.Map)" + justification: "add new interface for flink metrics config" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/build.gradle b/build.gradle index bfc9822127b1..9aa9e9c9d88b 100644 --- a/build.gradle +++ b/build.gradle @@ -111,7 +111,7 @@ subprojects { revapi { oldGroup = project.group oldName = project.name - oldVersion = "1.1.0" + oldVersion = "1.0.0" } } diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java index 9315fde5d94a..6152b69140de 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -107,6 +107,30 @@ public static MetricsConfig forPositionDelete(Table table) { return new MetricsConfig(columnModes.build(), defaultMode); } + /** + * Creates a metrics config for a position delete file. + * + * @param props table configuration + */ + public static MetricsConfig forPositionDelete(Map props) { + ImmutableMap.Builder columnModes = ImmutableMap.builder(); + + columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get()); + columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get()); + + MetricsConfig tableConfig = from(props, null, null); + + MetricsMode defaultMode = tableConfig.defaultMode; + tableConfig.columnModes.forEach( + (columnAlias, mode) -> { + String positionDeleteColumnAlias = + DOT.join(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, columnAlias); + columnModes.put(positionDeleteColumnAlias, mode); + }); + + return new MetricsConfig(columnModes.build(), defaultMode); + } + /** * Generate a MetricsConfig for all columns based on overrides, schema, and sort order. * diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 85289eceff15..36c00aa43685 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -216,7 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(props); try { switch (format) { case AVRO: diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 85289eceff15..36c00aa43685 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -216,7 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(props); try { switch (format) { case AVRO: diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 85289eceff15..36c00aa43685 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -216,7 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(props); try { switch (format) { case AVRO: From 205638ddcb5b31233049e08b29c3f3b184128b7d Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Wed, 30 Nov 2022 13:09:59 +0800 Subject: [PATCH 02/10] refactor --- .../org/apache/iceberg/MetricsConfig.java | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java index 6152b69140de..8c3598bb13d5 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -89,39 +89,26 @@ public static MetricsConfig forTable(Table table) { * @param table an Iceberg table */ public static MetricsConfig forPositionDelete(Table table) { - ImmutableMap.Builder columnModes = ImmutableMap.builder(); - - columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get()); - columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get()); - - MetricsConfig tableConfig = forTable(table); - - MetricsMode defaultMode = tableConfig.defaultMode; - tableConfig.columnModes.forEach( - (columnAlias, mode) -> { - String positionDeleteColumnAlias = - DOT.join(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, columnAlias); - columnModes.put(positionDeleteColumnAlias, mode); - }); - - return new MetricsConfig(columnModes.build(), defaultMode); + return forPositionDelete(forTable(table)); } /** * Creates a metrics config for a position delete file. * - * @param props table configuration + * @param props table properties */ public static MetricsConfig forPositionDelete(Map props) { + return forPositionDelete(from(props, null, null)); + } + + private static MetricsConfig forPositionDelete(MetricsConfig config) { ImmutableMap.Builder columnModes = ImmutableMap.builder(); columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get()); columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get()); - MetricsConfig tableConfig = from(props, null, null); - - MetricsMode defaultMode = tableConfig.defaultMode; - tableConfig.columnModes.forEach( + MetricsMode defaultMode = config.defaultMode; + config.columnModes.forEach( (columnAlias, mode) -> { String positionDeleteColumnAlias = DOT.join(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, columnAlias); From 5d6314190062c93cbfc0a623c73ced33c5c9d924 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Fri, 2 Dec 2022 15:04:58 +0800 Subject: [PATCH 03/10] rebase --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 9aa9e9c9d88b..bfc9822127b1 100644 --- a/build.gradle +++ b/build.gradle @@ -111,7 +111,7 @@ subprojects { revapi { oldGroup = project.group oldName = project.name - oldVersion = "1.0.0" + oldVersion = "1.1.0" } } From c44d0364f47669b65b8fc8493f91a326f9a8f083 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 6 Dec 2022 11:44:18 +0800 Subject: [PATCH 04/10] refactor according to steven's comments --- .palantir/revapi.yml | 6 ---- .../org/apache/iceberg/MetricsConfig.java | 19 +++-------- .../flink/sink/FlinkAppenderFactory.java | 34 ++++++++++++++++--- .../flink/sink/RowDataTaskWriterFactory.java | 6 +++- .../flink/sink/FlinkAppenderFactory.java | 34 ++++++++++++++++--- .../flink/sink/RowDataTaskWriterFactory.java | 6 +++- .../flink/sink/FlinkAppenderFactory.java | 34 ++++++++++++++++--- .../flink/sink/RowDataTaskWriterFactory.java | 6 +++- 8 files changed, 109 insertions(+), 36 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index ea2a6b88cbfd..a4a7559b8e71 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -65,12 +65,6 @@ acceptedBreaks: - code: "java.method.removed" old: "method void org.apache.iceberg.io.DataWriter::add(T)" justification: "Removing deprecated method" - "1.1.0": - org.apache.iceberg:iceberg-core: - - code: "java.method.addedToInterface" - new : "method org.apache.iceberg.MetricsConfig org.apache.iceberg.MetricsConfig::forPositionDelete(\ - java.util.Map)" - justification: "add new interface for flink metrics config" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java index 8c3598bb13d5..9315fde5d94a 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -89,26 +89,15 @@ public static MetricsConfig forTable(Table table) { * @param table an Iceberg table */ public static MetricsConfig forPositionDelete(Table table) { - return forPositionDelete(forTable(table)); - } - - /** - * Creates a metrics config for a position delete file. - * - * @param props table properties - */ - public static MetricsConfig forPositionDelete(Map props) { - return forPositionDelete(from(props, null, null)); - } - - private static MetricsConfig forPositionDelete(MetricsConfig config) { ImmutableMap.Builder columnModes = ImmutableMap.builder(); columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get()); columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get()); - MetricsMode defaultMode = config.defaultMode; - config.columnModes.forEach( + MetricsConfig tableConfig = forTable(table); + + MetricsMode defaultMode = tableConfig.defaultMode; + tableConfig.columnModes.forEach( (columnAlias, mode) -> { String positionDeleteColumnAlias = DOT.join(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, columnAlias); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 36c00aa43685..0c36306dc1b1 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -55,13 +56,14 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final int[] equalityFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Table table; private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory( Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); + this(null, schema, flinkSchema, props, spec, null, null, null); } public FlinkAppenderFactory( @@ -72,6 +74,27 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this( + null, + schema, + flinkSchema, + props, + spec, + equalityFieldIds, + eqDeleteRowSchema, + posDeleteRowSchema); + } + + public FlinkAppenderFactory( + Table table, + Schema schema, + RowType flinkSchema, + Map props, + PartitionSpec spec, + int[] equalityFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema) { + this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -99,7 +122,8 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: @@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: @@ -216,7 +241,8 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..c624eb3f0276 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); + this.appenderFactory = + new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, spec, null, null, null); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory( // that are correct for the deleted row. Therefore, only write the equality delete fields. this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory( } else { this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 36c00aa43685..0c36306dc1b1 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -55,13 +56,14 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final int[] equalityFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Table table; private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory( Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); + this(null, schema, flinkSchema, props, spec, null, null, null); } public FlinkAppenderFactory( @@ -72,6 +74,27 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this( + null, + schema, + flinkSchema, + props, + spec, + equalityFieldIds, + eqDeleteRowSchema, + posDeleteRowSchema); + } + + public FlinkAppenderFactory( + Table table, + Schema schema, + RowType flinkSchema, + Map props, + PartitionSpec spec, + int[] equalityFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema) { + this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -99,7 +122,8 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: @@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: @@ -216,7 +241,8 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..c624eb3f0276 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); + this.appenderFactory = + new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, spec, null, null, null); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory( // that are correct for the deleted row. Therefore, only write the equality delete fields. this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory( } else { this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 36c00aa43685..0c36306dc1b1 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -30,6 +30,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -55,13 +56,14 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private final int[] equalityFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Table table; private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; public FlinkAppenderFactory( Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(schema, flinkSchema, props, spec, null, null, null); + this(null, schema, flinkSchema, props, spec, null, null, null); } public FlinkAppenderFactory( @@ -72,6 +74,27 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this( + null, + schema, + flinkSchema, + props, + spec, + equalityFieldIds, + eqDeleteRowSchema, + posDeleteRowSchema); + } + + public FlinkAppenderFactory( + Table table, + Schema schema, + RowType flinkSchema, + Map props, + PartitionSpec spec, + int[] equalityFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema) { + this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; this.props = props; @@ -99,7 +122,8 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: @@ -160,7 +184,8 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: @@ -216,7 +241,8 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(props); + MetricsConfig metricsConfig = + table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 634c2dfddaed..c624eb3f0276 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory( this.upsert = upsert; if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { - this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec); + this.appenderFactory = + new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, spec, null, null, null); } else if (upsert) { // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of // the inserted row @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory( // that are correct for the deleted row. Therefore, only write the equality delete fields. this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory( } else { this.appenderFactory = new FlinkAppenderFactory( + table, schema, flinkSchema, writeProperties, From d75c3d43340d07d921f42697ac29c9c8f478cac5 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 6 Dec 2022 21:00:56 +0800 Subject: [PATCH 05/10] tmp --- .../src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java | 1 + .../java/org/apache/iceberg/flink/sink/TestFlinkManifest.java | 1 + .../apache/iceberg/flink/sink/TestIcebergFilesCommitter.java | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index f5b60c63d708..c767212cdb21 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) { } public static DataFile writeFile( + Table table, Schema schema, PartitionSpec spec, Configuration conf, diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 27dc665055cc..92af682dc883 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -254,6 +254,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( + table, table.schema(), table.spec(), CONF, diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 6fc4a5639fcf..704808336254 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -807,7 +807,7 @@ private List assertFlinkManifests(int expectedCount) throws IOException { } private DataFile writeDataFile(String filename, List rows) throws IOException { - return SimpleDataUtil.writeFile( + return SimpleDataUtil.writeFile(table, table.schema(), table.spec(), CONF, table.location(), format.addExtension(filename), rows); } From 4b885c316f4f31664a85bcb3dc6c6513add70631 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 14:33:46 +0800 Subject: [PATCH 06/10] tmp --- .../org/apache/iceberg/flink/sink/FlinkAppenderFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 0c36306dc1b1..5cfd4e11931c 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -242,7 +242,7 @@ public EqualityDeleteWriter newEqDeleteWriter( public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + table != null ? MetricsConfig.forPositionDelete(table) : MetricsConfig.fromProperties(props); try { switch (format) { case AVRO: From 0a9337f45a204db374599a33575364e6168a0053 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 15:29:25 +0800 Subject: [PATCH 07/10] refactor ctor --- .../flink/sink/FlinkAppenderFactory.java | 34 +++---------------- .../apache/iceberg/flink/SimpleDataUtil.java | 3 +- .../flink/sink/TestFlinkAppenderFactory.java | 1 + .../iceberg/flink/sink/TestFlinkManifest.java | 1 + .../flink/sink/TestIcebergFilesCommitter.java | 11 ++++-- .../flink/source/TestFlinkMergingMetrics.java | 19 ++++++++++- 6 files changed, 35 insertions(+), 34 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 5cfd4e11931c..8d99aa162fb9 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -61,30 +61,6 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; - public FlinkAppenderFactory( - Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(null, schema, flinkSchema, props, spec, null, null, null); - } - - public FlinkAppenderFactory( - Schema schema, - RowType flinkSchema, - Map props, - PartitionSpec spec, - int[] equalityFieldIds, - Schema eqDeleteRowSchema, - Schema posDeleteRowSchema) { - this( - null, - schema, - flinkSchema, - props, - spec, - equalityFieldIds, - eqDeleteRowSchema, - posDeleteRowSchema); - } - public FlinkAppenderFactory( Table table, Schema schema, @@ -94,6 +70,7 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + Preconditions.checkNotNull(table, "table should not be null"); this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; @@ -122,8 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -184,8 +160,7 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -241,8 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forPositionDelete(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { switch (format) { case AVRO: diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index c767212cdb21..1bd9c34e1c06 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -139,7 +139,8 @@ public static DataFile writeFile( RowType flinkSchema = FlinkSchemaUtil.convert(schema); FileAppenderFactory appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); + new FlinkAppenderFactory( + table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..d25b2792ac65 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -43,6 +43,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { protected FileAppenderFactory createAppenderFactory( List equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) { return new FlinkAppenderFactory( + table, table.schema(), rowType, table.properties(), diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 92af682dc883..36801420106e 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -78,6 +78,7 @@ public void before() throws IOException { }; this.appenderFactory = new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 704808336254..5a4cf6223c26 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -766,6 +766,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId() }; return new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -807,8 +808,14 @@ private List assertFlinkManifests(int expectedCount) throws IOException { } private DataFile writeDataFile(String filename, List rows) throws IOException { - return SimpleDataUtil.writeFile(table, - table.schema(), table.spec(), CONF, table.location(), format.addExtension(filename), rows); + return SimpleDataUtil.writeFile( + table, + table.schema(), + table.spec(), + CONF, + table.location(), + format.addExtension(filename), + rows); } private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 3a7ec96cb1d6..1d52acb2fe7b 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -27,13 +27,23 @@ import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource(TEMP_FOLDER, "test_db", "test_table", SCHEMA); + public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); } @@ -44,7 +54,14 @@ protected FileAppender writeAndGetAppender(List records) throws FileAppender appender = new FlinkAppenderFactory( - SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + tableResource.table(), + SCHEMA, + flinkSchema, + ImmutableMap.of(), + PartitionSpec.unpartitioned(), + null, + null, + null) .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); From a762972eaf1f9811c95da295fccf8f96dd9b58b8 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 16:02:37 +0800 Subject: [PATCH 08/10] fix for flink 1.14 --- .../flink/sink/FlinkAppenderFactory.java | 24 ------------------- .../apache/iceberg/flink/SimpleDataUtil.java | 4 +++- .../flink/sink/TestFlinkAppenderFactory.java | 2 +- .../iceberg/flink/sink/TestFlinkManifest.java | 2 ++ .../flink/sink/TestIcebergFilesCommitter.java | 3 ++- .../flink/source/TestFlinkMergingMetrics.java | 19 ++++++++++++++- 6 files changed, 26 insertions(+), 28 deletions(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 0c36306dc1b1..e3826e426ea3 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -61,30 +61,6 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; - public FlinkAppenderFactory( - Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(null, schema, flinkSchema, props, spec, null, null, null); - } - - public FlinkAppenderFactory( - Schema schema, - RowType flinkSchema, - Map props, - PartitionSpec spec, - int[] equalityFieldIds, - Schema eqDeleteRowSchema, - Schema posDeleteRowSchema) { - this( - null, - schema, - flinkSchema, - props, - spec, - equalityFieldIds, - eqDeleteRowSchema, - posDeleteRowSchema); - } - public FlinkAppenderFactory( Table table, Schema schema, diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 7a7bfdfff3df..e50123ab9137 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) { } public static DataFile writeFile( + Table table, Schema schema, PartitionSpec spec, Configuration conf, @@ -138,7 +139,8 @@ public static DataFile writeFile( RowType flinkSchema = FlinkSchemaUtil.convert(schema); FileAppenderFactory appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); + new FlinkAppenderFactory( + table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..282f993824bc 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -31,7 +31,6 @@ import org.apache.iceberg.util.StructLikeSet; public class TestFlinkAppenderFactory extends TestAppenderFactory { - private final RowType rowType; public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { @@ -43,6 +42,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { protected FileAppenderFactory createAppenderFactory( List equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) { return new FlinkAppenderFactory( + table, table.schema(), rowType, table.properties(), diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 7fe4e159fc61..5528e71b3db1 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -79,6 +79,7 @@ public void before() throws IOException { }; this.appenderFactory = new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -255,6 +256,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( + table, table.schema(), table.spec(), CONF, diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index e27b9093f19d..ca643ad53229 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -771,6 +771,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId() }; return new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -813,7 +814,7 @@ private List assertFlinkManifests(int expectedCount) throws IOException { private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( - table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows); + table, table.schema(), table.spec(), CONF, tablePath, format.addExtension(filename), rows); } private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 3a7ec96cb1d6..291d66538f6b 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -27,13 +27,23 @@ import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA); + public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); } @@ -44,7 +54,14 @@ protected FileAppender writeAndGetAppender(List records) throws FileAppender appender = new FlinkAppenderFactory( - SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + tableResource.table(), + SCHEMA, + flinkSchema, + ImmutableMap.of(), + PartitionSpec.unpartitioned(), + null, + null, + null) .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); From 4de6115cc7c79b9a00194d35ee325ef59b13ba39 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 16:08:44 +0800 Subject: [PATCH 09/10] fix flink 1.15 --- .../flink/sink/FlinkAppenderFactory.java | 24 ------------------- .../apache/iceberg/flink/SimpleDataUtil.java | 4 +++- .../flink/sink/TestFlinkAppenderFactory.java | 1 + .../iceberg/flink/sink/TestFlinkManifest.java | 2 ++ .../flink/sink/TestIcebergFilesCommitter.java | 9 ++++++- .../flink/source/TestFlinkMergingMetrics.java | 18 +++++++++++++- 6 files changed, 31 insertions(+), 27 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 0c36306dc1b1..e3826e426ea3 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -61,30 +61,6 @@ public class FlinkAppenderFactory implements FileAppenderFactory, Seria private RowType eqDeleteFlinkSchema = null; private RowType posDeleteFlinkSchema = null; - public FlinkAppenderFactory( - Schema schema, RowType flinkSchema, Map props, PartitionSpec spec) { - this(null, schema, flinkSchema, props, spec, null, null, null); - } - - public FlinkAppenderFactory( - Schema schema, - RowType flinkSchema, - Map props, - PartitionSpec spec, - int[] equalityFieldIds, - Schema eqDeleteRowSchema, - Schema posDeleteRowSchema) { - this( - null, - schema, - flinkSchema, - props, - spec, - equalityFieldIds, - eqDeleteRowSchema, - posDeleteRowSchema); - } - public FlinkAppenderFactory( Table table, Schema schema, diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index f5b60c63d708..1bd9c34e1c06 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -125,6 +125,7 @@ public static RowData createUpdateAfter(Integer id, String data) { } public static DataFile writeFile( + Table table, Schema schema, PartitionSpec spec, Configuration conf, @@ -138,7 +139,8 @@ public static DataFile writeFile( RowType flinkSchema = FlinkSchemaUtil.convert(schema); FileAppenderFactory appenderFactory = - new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), spec); + new FlinkAppenderFactory( + table, schema, flinkSchema, ImmutableMap.of(), spec, null, null, null); FileAppender appender = appenderFactory.newAppender(fromPath(path, conf), fileFormat); try (FileAppender closeableAppender = appender) { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java index 4c17cd7607df..d25b2792ac65 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkAppenderFactory.java @@ -43,6 +43,7 @@ public TestFlinkAppenderFactory(String fileFormat, boolean partitioned) { protected FileAppenderFactory createAppenderFactory( List equalityFieldIds, Schema eqDeleteSchema, Schema posDeleteRowSchema) { return new FlinkAppenderFactory( + table, table.schema(), rowType, table.properties(), diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index 27dc665055cc..36801420106e 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -78,6 +78,7 @@ public void before() throws IOException { }; this.appenderFactory = new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -254,6 +255,7 @@ public ManifestFile deserialize(int version, byte[] serialized) throws IOExcepti private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( + table, table.schema(), table.spec(), CONF, diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 6fc4a5639fcf..5a4cf6223c26 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -766,6 +766,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { table.schema().findField("id").fieldId(), table.schema().findField("data").fieldId() }; return new FlinkAppenderFactory( + table, table.schema(), FlinkSchemaUtil.convert(table.schema()), table.properties(), @@ -808,7 +809,13 @@ private List assertFlinkManifests(int expectedCount) throws IOException { private DataFile writeDataFile(String filename, List rows) throws IOException { return SimpleDataUtil.writeFile( - table.schema(), table.spec(), CONF, table.location(), format.addExtension(filename), rows); + table, + table.schema(), + table.spec(), + CONF, + table.location(), + format.addExtension(filename), + rows); } private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 3a7ec96cb1d6..30ed8a9742e1 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -27,12 +27,21 @@ import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopTableResource; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.sink.FlinkAppenderFactory; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public final HadoopTableResource tableResource = + new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA); public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); @@ -44,7 +53,14 @@ protected FileAppender writeAndGetAppender(List records) throws FileAppender appender = new FlinkAppenderFactory( - SCHEMA, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + tableResource.table(), + SCHEMA, + flinkSchema, + ImmutableMap.of(), + PartitionSpec.unpartitioned(), + null, + null, + null) .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), fileFormat); try (FileAppender fileAppender = appender) { records.stream().map(r -> RowDataConverter.convert(SCHEMA, r)).forEach(fileAppender::add); From f43000bd06f3f665cd2a14bd1400cdc4134eb5d9 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Tue, 13 Dec 2022 16:15:57 +0800 Subject: [PATCH 10/10] add check --- .../iceberg/flink/sink/FlinkAppenderFactory.java | 10 ++++------ .../iceberg/flink/sink/FlinkAppenderFactory.java | 10 ++++------ .../iceberg/flink/sink/FlinkAppenderFactory.java | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index e3826e426ea3..b6f1392d1562 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -70,6 +70,7 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + Preconditions.checkNotNull(table, "Table shouldn't be null"); this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; @@ -98,8 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -160,8 +160,7 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -217,8 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { switch (format) { case AVRO: diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index e3826e426ea3..b6f1392d1562 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -70,6 +70,7 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + Preconditions.checkNotNull(table, "Table shouldn't be null"); this.table = table; this.schema = schema; this.flinkSchema = flinkSchema; @@ -98,8 +99,7 @@ private RowType lazyPosDeleteFlinkSchema() { @Override public FileAppender newAppender(OutputFile outputFile, FileFormat format) { - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -160,8 +160,7 @@ public EqualityDeleteWriter newEqDeleteWriter( eqDeleteRowSchema, "Equality delete row schema shouldn't be null when creating equality-delete writer"); - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forTable(table); try { switch (format) { case AVRO: @@ -217,8 +216,7 @@ public EqualityDeleteWriter newEqDeleteWriter( @Override public PositionDeleteWriter newPosDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { - MetricsConfig metricsConfig = - table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); + MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table); try { switch (format) { case AVRO: diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 8d99aa162fb9..b6f1392d1562 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -70,7 +70,7 @@ public FlinkAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { - Preconditions.checkNotNull(table, "table should not be null"); + Preconditions.checkNotNull(table, "Table shouldn't be null"); this.table = table; this.schema = schema; this.flinkSchema = flinkSchema;