-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: use correct metric config for position deletes #6313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: use correct metric config for position deletes #6313
Conversation
build.gradle
Outdated
| oldGroup = project.group | ||
| oldName = project.name | ||
| oldVersion = "1.1.0" | ||
| oldVersion = "1.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for build, will change back if the master pass the build.
|
@stevenzwu @hililiwei You guys may have an interest in this. |
| * | ||
| * @param props table configuration | ||
| */ | ||
| public static MetricsConfig forPositionDelete(Map<String, String> props) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use the method from line 91?
public static MetricsConfig forPositionDelete(Table table) {
Flink write properties allows override of table properties for write (e.g. compression configs). Do we have such use cases for metrics config?
I am asking because the current fromProperties method (for data files) is deprecated in favor of the method forTable. Looks like deprecation is from PR #2240 by @szehon-ho .
/**
* Creates a metrics config from table configuration.
*
* @param props table configuration
* @deprecated use {@link MetricsConfig#forTable(Table)}
*/
@Deprecated
public static MetricsConfig fromProperties(Map<String, String> props) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least, we can avoid code duplication by extracting common code btw these two methods, if we do need this new variant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extraction done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada what do you think of using the existing API? I know it would require changes in FlinkAppenderFactory to pass in the Table object.
public static MetricsConfig forPositionDelete(Table table) {
The benefit is that position delete file will get the consistent behavior as data file. unless we are saying position delete has its own schema, shared code path doesn't really make sense.
private static MetricsConfig from(Map<String, String> props, Schema schema, SortOrder order) {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, positions delete does NOT share the same behavior as data files, SortedPosDeleteWriter sorts the content.
What do you mean by has its own schema? Position deletes can have two schemas, one is [path, pos] and another is [path, pos, row]. Currently, it writes [path, pos] in BaseTaskWriter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu I recheck the metric config, the schema and sort order are for the data row. Change to use forTable now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chenjunjiedada hmm. I think we should revert the last commit. if we are going to pass in the Table object, it should be non-null. Then we can consistently use forTable everywhere.
I would also be open with the forProperties approach, as you were saying that schema and sort order are for data rows/files. I had some reservations because MetricConfig deprecated forProperties for data rows/files. It is a little weird to introduce it back for position delete. Flink can use forPositionDelete(Table table) API by passing a valid SerializableTable object to FlinkAppenderFactory. With Table object, we can remove the Schema and PartitionSpec from the current FlinkAppenderFactory constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, Anton had a previous attempt for moving Flink to SerializableTable for Flink source/reader. #2987.
We can revive that effort as a separate PR? this can be merged after that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me take a look at that one fisrtly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu , That one (#6407) is for source. This PR is for the sink side. Let me take a look at the sink as well. Maybe eventually we will change back to adopt FileWriterFactory.
| columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get()); | ||
| columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get()); | ||
|
|
||
| MetricsConfig tableConfig = from(props, null, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is not forTable method, it shouldn't be called tableConfig.
hililiwei
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to use this simple fix first.
904b90f to
5d63141
Compare
|
|
||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); | ||
| MetricsConfig metricsConfig = | ||
| table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data row in equality delete should share the same metrics config as the table, so I change this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this actually makes a stronger case that we should pass in a valid Table object to the FlinkAppenderFactory as I mentioned in the other comment. we should also use forTable for position delete although it doesn't need schema or SortOrder as you said.
| EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { | ||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); | ||
| MetricsConfig metricsConfig = | ||
| table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be forPositionDelete
| public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat format) { | ||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); | ||
| MetricsConfig metricsConfig = | ||
| table != null ? MetricsConfig.forTable(table) : MetricsConfig.fromProperties(props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetricsConfig.fromProperties is deprecated. I am wondering if we should just ensure table is never null? That would require changing the current 2 constructors (technically breaking) without adding a new constructor. but this class should be an @Internal class. what do you think? @pvary @hililiwei @chenjunjiedada
|
thanks @chenjunjiedada for the contribution |
FlinkAppenderFactoryuses the wrong metric configs for position deletes, leading to ineffective filtering. Refactoring the whole write path toFileWriterFactorycould fix this while we are usingFileAppenderFactoryfor a long time. Not sure what other benefits can bring if we change that. So here is a simple fix.