Skip to content

Conversation

@waterlx
Copy link
Contributor

@waterlx waterlx commented Mar 20, 2020

Flink Iceberg sink, trying to address #567, modeled after nfflink-connector-iceberg

A workable version which could pass checkstyle, compiled with the latest master code, and has basic functions verified:

  • DataFile generated
  • Committed successfully

Being improve and polished.

The design doc, which contains:

  • Design extracted from nfflink-connector-iceberg
  • Future work and improvements to do
  • Sampe code

@jerryshao
Copy link
Contributor

jerryshao commented Mar 23, 2020

IMO, I think this PR is too big to review, we should break down into small tasks and submitted one by one.

Besides, we should have a design doc about this.

@waterlx
Copy link
Contributor Author

waterlx commented Mar 23, 2020

@jerryshao Thanks for the comments!

Yes, I will break it down into tasks/small pieces of code to contain separate functions which are more convenient for reviewers.

For the design doc, please take a look at here that I just uploaded.

*
* @param serializer Serialize input data type to Avro GenericRecord
*/
public IcebergSinkAppender<IN> withSerializer(AvroSerializer<IN> serializer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed.

versions.lock Outdated
com.google.code.gson:gson:2.2.4 (2 constraints: 9518bfd2)
com.github.scopt:scopt_2.11:3.5.0 (1 constraints: 5a0ef868)
com.github.stephenc.findbugs:findbugs-annotations:1.3.9-1 (5 constraints: 693c9a5d)
com.google.code.findbugs:jsr305:3.0.2 (18 constraints: a0f346c9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency must be excluded because it has a questionable license.

versions.lock Outdated
ch.qos.logback:logback-core:1.0.9 (4 constraints: dd32435a)
co.cask.tephra:tephra-api:0.6.0 (3 constraints: 0828ded1)
co.cask.tephra:tephra-core:0.6.0 (2 constraints: 831cd90d)
co.cask.tephra:tephra-hbase-compat-1.0:0.6.0 (1 constraints: 370d6920)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need to go through dependencies and remove whatever isn't used.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @waterlx for working this.
Skipped the patch, sharing two points:

  1. seems the current flink connector is binded to avro format. for iceberg's connector, we'd better to abstract to map the flink schema to iceberg schema, so that we can decouple with the underlying data format.
  2. Seems the patch is depending on few services inside netflix, such as S3, meta cat etc. better to remove that.

Thanks.

@waterlx
Copy link
Contributor Author

waterlx commented Mar 25, 2020

@rdblue @openinx Thanks very much for your time to review and comment!

I am working on some of them and opened a few issues to track other relative bigger ones. In the future, commits will be made in terms of issues, making it more convenient for reviewing and tracking.

@rdblue mind creating a milestone for Flink Iceberg sink?

@rdblue
Copy link
Contributor

rdblue commented Mar 25, 2020

I created the milestone for a Flink sink. Looks like most of the issues are to remove things from this PR, but we don't want to commit this before those are done. You might consider breaking up the work into issues that describe a reasonable order to add needed parts of this PR to Iceberg, instead of remove unnecessary parts of this PR from your branch.

@waterlx
Copy link
Contributor Author

waterlx commented Apr 2, 2020

Some update here in case you are interested:

  1. Spectator and Metacat are moved. Remove Metacat and use Catalog interface instead #867, Unify code path to support both HiveCatalog and HadoopCatalog #868 and Remove Spectator #871 are closed as result.
  2. Some duplicated logic are added to determine whethere it is Hive Catalog or Hadoop Catalog based on a setting, which I do not think has a good fit here. But they are used to check if the code path could apply to Hadoop Catalog as well. Plan to remove them or figure out a better way.
  3. Working on removing S3/AWS SDK and re-writing it using FileIO. (Use FileIO to replace S3 and AWS SDK #872)
  4. Will work on breaking up the logic into resonable pieces so as to add them to code base and be friendly to review, after the remove work are all cleared.
  5. The PR is still NOT ready for detailed review yet.

More on item 2 above:
The current logic is to pass a few parameters (String) to Fink sink, to tell it about namespace, table, even catalog, then the internal logic new Catalog and load the table. Maybe a way to simplify that is to pass an instance of Table, while the sink does not need to care about if Table is loaded by Hive/Hadoop Catalog or by HadoopTables. But the idea is blocked by BaseTable being not serializable, making the logic have to do more, such as new Catalog and load table.

@jerryshao @chenjunjiedada @openinx @aokolnychyi @bowenli86 @stevenzwu @rdblue FYI

@rdblue
Copy link
Contributor

rdblue commented Apr 3, 2020

The idea is blocked by BaseTable being not serializable, making the logic have to do more, such as new Catalog and load table.

Why do you need to serialize BaseTable? You should only need to serialize the table's schema, partition spec, and FileIO instance.

We don't want BaseTable to be Serializable to prevent problems:

  1. Table and BaseTable expose high-level operations that we don't want to be called from tasks. If it were easy to pass a table instance to tasks, it may also seem easy to commit data files from tasks in parallel. But that's a pattern that we want to discourage.
  2. BaseTable wraps TableOperations, which is plugged in by catalog and can be customized. Since this is likely to be implemented outside of Iceberg, we don't want to require serialization that would make it harder to build.

@waterlx
Copy link
Contributor Author

waterlx commented Apr 21, 2020

@rdblue Thanks for sharing your thoughts on the reasons why Table/BaseTable is not serializable. Totally agree. But I am currently in the dilemma where there might be a need to call high-level operations in Flink tasks, like table.newTransaction() when trying to commit DataFiles accumulated from the streaming inputs. Currently code limits the parallelism to 1 so that the commit won't be performed in parallel.

For now, it is not a blocker because I could pass the namespace and table name by config and call loadTable() of Catalog to build the table when there is a need. But the implementation is not that good as the table informations(like namespace, table name, it is HiveCatalog or HadoopCatalog) passes eveywhere, while some of them are not needed.

I am also thinking about passing the path as a string (db.table for Hive Catalog while full qualified path for HadoopTables) instead of passing table instance, but the purpose is also to re-build the table instance so as to call some high-level operations.

@waterlx
Copy link
Contributor Author

waterlx commented Apr 21, 2020

Some progress of Flink Iceberg sink in case you are interested:

  1. S3 and AWS SDK are removed, while FileIO is used instead. So Use FileIO to replace S3 and AWS SDK #872 is closed.
  2. The usage of Avro and IndexedRecord is almost removed. The input of sink accepted is Record/GenericRecord within Iceberg; For other inputs, RecordSerializer is supposed to be implemented accordingly.
  3. Working on clearing up VTTS and watermark related code. It has something to do with Avro and is the last dependent on Avro.

PR is updated with basic functions verified, but still not for detailed review yet. Hopefully I will get a clean version by the end of this week.
@jerryshao @chenjunjiedada @openinx @aokolnychyi @bowenli86 @stevenzwu @rdblue FYI

@waterlx waterlx closed this Apr 21, 2020
@waterlx waterlx reopened this Apr 21, 2020
waterlx added 23 commits June 3, 2020 15:49
…elete count to pass the pre-check of MergeAppend
@rdblue
Copy link
Contributor

rdblue commented Sep 11, 2020

Now that the sink is in, should we close this? @waterlx, @openinx

@rdblue rdblue closed this Sep 11, 2020
@rdblue
Copy link
Contributor

rdblue commented Sep 11, 2020

Oops. I clicked "close and comment" instead of "comment". I think it should be closed anyway, so I'll leave it as is. Feel free to reopen if this still has outstanding work. Thanks for all the great work!

@openinx
Copy link
Member

openinx commented Sep 16, 2020

I think it's time to close now, I will take a look to this wip PR again and update the improvement issue here if there's anything we've missed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants