Skip to content

Conversation

@amogh-jahagirdar
Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar commented Jan 25, 2023

This change adds support to write to branches in Flink Sink via a FlinkWriteOption "branch"

@github-actions github-actions bot added the flink label Jan 25, 2023
@jackye1995 jackye1995 self-requested a review January 25, 2023 05:29
@amogh-jahagirdar amogh-jahagirdar force-pushed the flink-branch-writes branch 7 times, most recently from b682cbe to 1ee27ed Compare January 25, 2023 15:55
@amogh-jahagirdar
Copy link
Contributor Author

amogh-jahagirdar commented Jan 25, 2023

moving this out of draft.

Beyond the code wanted to discuss high level API design for branch writes using FlinkSink. In the current implementation we're using a FlinkWriteOption so it looks like this right now:

FlinkSink sink = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .set("write-format", "avro")
    .set(FlinkWriteOptions.BRANCH, "some-branch")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true")

As I implemented this though started to think we may just want to have a direct branch method on the FlinkSink builder itself. That seems more intuitive from an API perspective and is just easier to use. The API below seems better to me

FlinkSink sink = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .branch("some-branch")
    .set("write-format", "avro")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true")

@stevenzwu @yyanyy @rdblue @jackye1995 @namrathamyske Let me know your thoughts here!

@amogh-jahagirdar amogh-jahagirdar marked this pull request as ready for review January 25, 2023 16:02
@amogh-jahagirdar amogh-jahagirdar changed the title Flink: Support writes to branches Flink: Support writes to branches in FlinkSink Jan 25, 2023
@amogh-jahagirdar
Copy link
Contributor Author

Need to fix up the tests also

@amogh-jahagirdar amogh-jahagirdar force-pushed the flink-branch-writes branch 2 times, most recently from 6be254d to 9fe7b8b Compare January 25, 2023 17:53
@stevenzwu
Copy link
Contributor

As I implemented this though started to think we may just want to have a direct branch method on the FlinkSink builder itself. That seems more intuitive from an API perspective and is just easier to use.

Yeah. I also think an explicit toBranch method in the builder itself is more intuitive.

@amogh-jahagirdar amogh-jahagirdar force-pushed the flink-branch-writes branch 2 times, most recently from 1ceaf7f to 7456b04 Compare January 25, 2023 21:34
@jackye1995
Copy link
Contributor

As I implemented this though started to think we may just want to have a direct branch method on the FlinkSink builder itself. That seems more intuitive from an API perspective and is just easier to use.

I think this is more of a question about convention, I will leave to people who use Flink more often to have an opinion. To me use toBranch definitely feels cleaner.

@amogh-jahagirdar amogh-jahagirdar force-pushed the flink-branch-writes branch 3 times, most recently from c1d0dd6 to 855fb9d Compare January 25, 2023 22:58
Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me, thanks for addressing the comments!

@amogh-jahagirdar
Copy link
Contributor Author

Thanks for the reviews @stevenzwu @jackye1995 !

@pvary
Copy link
Contributor

pvary commented Jan 26, 2023

Thanks for the PR @amogh-jahagirdar!
Left one small question.

Also, do we have this feature in FlinkSource?

@amogh-jahagirdar
Copy link
Contributor Author

Thanks for the PR @amogh-jahagirdar! Left one small question.

Also, do we have this feature in FlinkSource?

Thanks for the review @pvary ! Not yet, but I'm working on a PR for it, will publish when it's ready .

@amogh-jahagirdar amogh-jahagirdar force-pushed the flink-branch-writes branch 3 times, most recently from f9a8f60 to ca5ddd0 Compare January 28, 2023 01:17
@hililiwei
Copy link
Contributor

Thanks for the PR @amogh-jahagirdar! Left one small question.
Also, do we have this feature in FlinkSource?

Thanks for the review @pvary ! Not yet, but I'm working on a PR for it, will publish when it's ready .

A long time ago, I raised a related #5029 to do this, if you can, can you take a look?

@jackye1995
Copy link
Contributor

A long time ago, I raised a related #5029 to do this, if you can, can you take a look?

Thanks, marking that as a part of the milestone!

@amogh-jahagirdar amogh-jahagirdar requested review from stevenzwu and removed request for pvary February 9, 2023 15:47
new Object[] {"parquet", 2},
new Object[] {"orc", 1},
new Object[] {"orc", 2}
new Object[] {"avro", 1, "main"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use SnapshotRef.MAIN_BRANCH for all the places we have "main" hard-coded?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also similar to the comment in Spark https://github.com/apache/iceberg/pull/6651/files#r1101939155, can we reduce the number of combinations here?

@stevenzwu stevenzwu merged commit a7a6d35 into apache:master Feb 13, 2023
@stevenzwu
Copy link
Contributor

thanks @amogh-jahagirdar for the contribution and @jackye1995 and @hililiwei for review.

@amogh-jahagirdar
Copy link
Contributor Author

Thanks for the reviews! @stevenzwu @jackye1995 @hililiwei

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants