-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Add FlinkWriterFactory #2924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I need a writer factory for Flink so that I can introduce new task writers. |
| builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema)); | ||
| } | ||
|
|
||
| private RowType dataFlinkType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this logic to constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe move this part to the Builder#build() method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are optional and may not be set/needed unless we actually write data files.
Pure DELETEs will not need it, for example.
| return this; | ||
| } | ||
|
|
||
| Builder dataFlinkType(RowType newDataFlinkType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an optional param? In the other comment, I saw we are converting Schema (above) to RowType if dataFlinkType is not set.
Maybe add Javadoc to mark all the *FlinkType methods/params as optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added docs to optional methods.
| } | ||
|
|
||
| private RowType positionDeleteFlinkType() { | ||
| if (positionDeleteFlinkType == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is similar to data files. The value is optional and may not be needed.
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice base test class
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Thanks for reviewing, @openinx @stevenzwu! |
This PR adds
FlinkWriterFactorythat extendsBaseWriterFactoryintroduced in #2873.