Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -55,13 +56,14 @@ public class FlinkAppenderFactory implements FileAppenderFactory<RowData>, Seria
private final int[] equalityFieldIds;
private final Schema eqDeleteRowSchema;
private final Schema posDeleteRowSchema;
private final Table table;

private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;

public FlinkAppenderFactory(
Schema schema, RowType flinkSchema, Map<String, String> props, PartitionSpec spec) {
this(schema, flinkSchema, props, spec, null, null, null);
this(null, schema, flinkSchema, props, spec, null, null, null);
}

public FlinkAppenderFactory(
Expand All @@ -72,6 +74,27 @@ public FlinkAppenderFactory(
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this(
null,
schema,
flinkSchema,
props,
spec,
equalityFieldIds,
eqDeleteRowSchema,
posDeleteRowSchema);
}

public FlinkAppenderFactory(
Table table,
Schema schema,
RowType flinkSchema,
Map<String, String> props,
PartitionSpec spec,
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this.table = table;
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
Expand Down Expand Up @@ -99,7 +122,8 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -160,7 +184,8 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -216,7 +241,8 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
@Override
public PositionDeleteWriter<RowData> newPosDeleteWriter(
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
this.appenderFactory =
new FlinkAppenderFactory(
table, schema, flinkSchema, writeProperties, spec, null, null, null);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory(
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand All @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory(
} else {
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -55,13 +56,14 @@ public class FlinkAppenderFactory implements FileAppenderFactory<RowData>, Seria
private final int[] equalityFieldIds;
private final Schema eqDeleteRowSchema;
private final Schema posDeleteRowSchema;
private final Table table;

private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;

public FlinkAppenderFactory(
Schema schema, RowType flinkSchema, Map<String, String> props, PartitionSpec spec) {
this(schema, flinkSchema, props, spec, null, null, null);
this(null, schema, flinkSchema, props, spec, null, null, null);
}

public FlinkAppenderFactory(
Expand All @@ -72,6 +74,27 @@ public FlinkAppenderFactory(
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this(
null,
schema,
flinkSchema,
props,
spec,
equalityFieldIds,
eqDeleteRowSchema,
posDeleteRowSchema);
}

public FlinkAppenderFactory(
Table table,
Schema schema,
RowType flinkSchema,
Map<String, String> props,
PartitionSpec spec,
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this.table = table;
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
Expand Down Expand Up @@ -99,7 +122,8 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -160,7 +184,8 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data row in equality delete should share the same metrics config as the table, so I change this as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually makes a stronger case that we should pass in a valid Table object to the FlinkAppenderFactory as I mentioned in the other comment. we should also use forTable for position delete although it doesn't need schema or SortOrder as you said.

try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -216,7 +241,8 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
@Override
public PositionDeleteWriter<RowData> newPosDeleteWriter(
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
this.appenderFactory =
new FlinkAppenderFactory(
table, schema, flinkSchema, writeProperties, spec, null, null, null);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory(
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand All @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory(
} else {
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -55,13 +56,14 @@ public class FlinkAppenderFactory implements FileAppenderFactory<RowData>, Seria
private final int[] equalityFieldIds;
private final Schema eqDeleteRowSchema;
private final Schema posDeleteRowSchema;
private final Table table;

private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;

public FlinkAppenderFactory(
Schema schema, RowType flinkSchema, Map<String, String> props, PartitionSpec spec) {
this(schema, flinkSchema, props, spec, null, null, null);
this(null, schema, flinkSchema, props, spec, null, null, null);
}

public FlinkAppenderFactory(
Expand All @@ -72,6 +74,27 @@ public FlinkAppenderFactory(
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this(
null,
schema,
flinkSchema,
props,
spec,
equalityFieldIds,
eqDeleteRowSchema,
posDeleteRowSchema);
}

public FlinkAppenderFactory(
Table table,
Schema schema,
RowType flinkSchema,
Map<String, String> props,
PartitionSpec spec,
int[] equalityFieldIds,
Schema eqDeleteRowSchema,
Schema posDeleteRowSchema) {
this.table = table;
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
Expand Down Expand Up @@ -99,7 +122,8 @@ private RowType lazyPosDeleteFlinkSchema() {

@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
Copy link
Contributor

@stevenzwu stevenzwu Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsConfig.fromProperties is deprecated. I am wondering if we should just ensure table is never null? That would require changing the current 2 constructors (technically breaking) without adding a new constructor. but this class should be an @Internal class. what do you think? @pvary @hililiwei @chenjunjiedada

try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -160,7 +184,8 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
eqDeleteRowSchema,
"Equality delete row schema shouldn't be null when creating equality-delete writer");

MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
try {
switch (format) {
case AVRO:
Expand Down Expand Up @@ -216,7 +241,8 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(
@Override
public PositionDeleteWriter<RowData> newPosDeleteWriter(
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
MetricsConfig metricsConfig =
table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be forPositionDelete

try {
switch (format) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ public RowDataTaskWriterFactory(
this.upsert = upsert;

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
this.appenderFactory =
new FlinkAppenderFactory(
table, schema, flinkSchema, writeProperties, spec, null, null, null);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
Expand All @@ -81,6 +83,7 @@ public RowDataTaskWriterFactory(
// that are correct for the deleted row. Therefore, only write the equality delete fields.
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand All @@ -91,6 +94,7 @@ public RowDataTaskWriterFactory(
} else {
this.appenderFactory =
new FlinkAppenderFactory(
table,
schema,
flinkSchema,
writeProperties,
Expand Down