-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Support create iceberg table with 'connector'='iceberg' #2666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
Show resolved
Hide resolved
| options.add(CATALOG_TYPE); | ||
| options.add(CATALOG_NAME); | ||
| options.add(CATALOG_DATABASE); | ||
| return Sets.newHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you intending to return options here? Possibly I am not understanding the interface fully, but it seems like options should be returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct, we will need to return the options. Thanks for the checking.
| return String.format("file://%s", warehouse.getRoot().getAbsolutePath()); | ||
| } | ||
|
|
||
| protected List<Row> sql(String query, Object... args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlinkTestBase already has a sql method. it seems that the only diff is the extra call of tableResult.await() here. Is it necessary considering we are calling tableResult.collect()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on double checking whether it's needed at all. When I've removed the whole function then the test still passed.
If there is an objective reason why this needed then @Override must be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable.
| tableProps.put("catalog-database", "local_db"); | ||
| tableProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseRoot()); | ||
|
|
||
| sql("CREATE TABLE hadoop_table (id BIGINT, data STRING) WITH %s", toWithClause(tableProps)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we configure the catalog-name and catalog-database in the props. Flink SQL supports the namespace syntax. trying to understand if that is a problem.
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/
CREATE TABLE [catalog_name.][db_name.]table_name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I'd like to explain that why I have planned to introduce the catalog-name & catalog-database in the table properties: For flink users, the SQL CREATE TABLE iceberg_sink(id BIGINT, data STRING) WITH ('connector'='iceberg', ... ) is actually creating a flink table which is managed in the flink's in-memory catalog default_catalog and default_database, it don't have any relationship to the iceberg catalog which manage the storage layer's database -> table -> table location and provide global table lock services.
Those table properties from flink create DDL is actually building the mapping relationship between flink's in-memory catalog and the storage layer catalog.
If we use the CREATE TABLE [catalog_name.][db_name]table_name, that's what the current way we've provided in the document, I mean the flink's catalog is exactly to the same iceberg catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the meaning of specifying catalog database, but I still don't understand why catalog name must be specified What's the use of specifying catalog name? For Flink, any value of catalog name can be specified, which has no impact on my data processing tasks. @openinx
| Preconditions.checkNotNull(catalogDatabase, "Table property '%s' cannot be null", CATALOG_DATABASE.key()); | ||
|
|
||
| org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | ||
| CatalogLoader catalogLoader = FlinkCatalogFactory.createCatalogLoader(catalogName, tableProps, hadoopConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am little confused. FlinkCatalogFactory#createCatalogLoader is a non-static method. How does this line work?
Also since we are creating a FlinkCatalogFactory instance below, we can call the non-static createCatalogLoader below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu please see up. In this PR createCatalogLoader is made static. +1 not making static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could use the following created factory to create the CatalogLoader instance, don't have to make it to be static here.
| return String.format("file://%s", warehouse.getRoot().getAbsolutePath()); | ||
| } | ||
|
|
||
| protected List<Row> sql(String query, Object... args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on double checking whether it's needed at all. When I've removed the whole function then the test still passed.
If there is an objective reason why this needed then @Override must be added.
|
|
||
| @Rule | ||
| public final TemporaryFolder warehouse = new TemporaryFolder(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Extra newline can be removed.
| EnvironmentSettings settings = EnvironmentSettings | ||
| .newInstance() | ||
| .useBlinkPlanner() | ||
| .inBatchMode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've double checked it works with inStreamingMode(). It would be good to add parameterized tests which runs in both modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can add the parameterized isStreaming to test both batch job and streaming job, although I think the batch job is enough to cover this PR changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, this is nice to have.
| Preconditions.checkNotNull(catalogDatabase, "Table property '%s' cannot be null", CATALOG_DATABASE.key()); | ||
|
|
||
| org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | ||
| CatalogLoader catalogLoader = FlinkCatalogFactory.createCatalogLoader(catalogName, tableProps, hadoopConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu please see up. In this PR createCatalogLoader is made static. +1 not making static.
flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
Show resolved
Hide resolved
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
@rdblue , @stevenzwu do you have another other concerns for this PR ? I think this is a very important issue for flink users because it's more in line with the usage habits of flink users. |
|
@openinx, sorry for the delay. I'll make some time to review this. I also know that while I was out there were a few PRs that I wasn't able to help move along. If you have a list of PRs that are important for you to get in, please send them to me and I'll make time to get them reviewed as I can. Thank you for being patient with me! |
| try { | ||
| flinkCatalog.createDatabase(catalogDatabase, new CatalogDatabaseImpl(Maps.newHashMap(), null), true); | ||
| } catch (DatabaseAlreadyExistException | CatalogException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add an error msg like "Failed to create database"
| try { | ||
| flinkCatalog.createTable(objectPath, catalogTable, true); | ||
| } catch (TableAlreadyExistException | CatalogException e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add an error msg
| // Drop and create it again. | ||
| sql("DROP TABLE %s", TABLE_NAME); | ||
| sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, toWithClause(tableProps)); | ||
| Assert.assertEquals("Should have expected rows", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop table doesn't purge data and metadata files, right? table creation again will be able to see the old data. I guess it is to prevent the disaster from accidental "drop table"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop table doesn't purge data and metadata files, right?
Yes, this DROP TABLE test_table is actually doing DROP TABLE default_catalog.default_database.test_table, which means dropping the test_table from flink's in-memory catalog, the underlying iceberg catalog & table (test-hadoop.default.test_table) that flink table is mapping to won't be effected. Here I want to make sure that we could map the flink in-memory table to the underlying iceberg table again once we drop and create again.
| sql("SELECT * FROM %s", TABLE_NAME)); | ||
|
|
||
| sql("DROP TABLE %s", TABLE_NAME); | ||
| HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we run this cleanup block in the try-finally? Another option to split this file into two files (one for hadoop and one for hive). we can also potentially remove some of the redundant code with a base test class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the cleanup block in the try-finally looks great to me. About splitting it into two files for hadoop and hive, I think we won't have too much test branches for hadoop and hive because most of the flink+iceberg integration test work will be accomplished in the flink catalog test suite, this UT is only used for making the CREATE TABLE xx () WITH ('connector'='iceberg') work, so I made them into this whole test class.
@rdblue , thanks for your time. All the PRs that were published by me are here : https://github.com/apache/iceberg/pulls/openinx . There are three parts :
|
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| if (catalog != null) { | ||
| tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath()); | ||
| } else { | ||
| tableLoader = createTableLoader(catalogTable, tableProps, objectIdentifier.getObjectName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this discard the rest of the identifier? What if the database is non-null? It seems like it should not be ignored and is a better way to get the database than a property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is similar to the @stevenzwu 's question . Saying if the flink table identifier is flink_catalog.flink_database.table_name (when creating table using the connector=iceberg, not the iceberg catalog approach), then it's mapping the flink table with name table_name to the underlying iceberg table table_name with the configured database in table property. The flink's flink_catalog and flink_database does not has any relationship to the iceberg's catalog & database.
Maybe it's better to forbidden people to specify a connector=iceberg property when creating table under the iceberg catalog ( adding a Precondition.checkArgument in createDynamicTableSource and createDynamicTableSink), then we don't mixed the two approaches to create flink+iceberg table or map flink table to iceberg table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I thought that this was a way to run DDL even if you don't have an Iceberg catalog defined. It sort of does that, but it also creates a reference in the in-memory catalog. That's fine, but it does bring up a couple other questions:
- How do you create an in-memory catalog table pointing to an Iceberg table if the Iceberg table already exists? What if the DDL, like the schema, doesn't match?
- Why share the table name between the in-memory and external catalog but not the database? I think it makes sense to default the external database and table name using the ones from the DDL command but allow both to be overridden.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you create an in-memory catalog table pointing to an Iceberg table if the Iceberg table already exists? What if the DDL, like the schema, doesn't match?
If the underlying iceberg table already exists, the we still need to create the in-memory catalog table pointing to it. If the in-memory catalog table schema does not match the underlying iceberg table, then the create table statement won't throw any exception but when executing the SELECT query or INSERT INTO query it will throw exception if the RowData read from iceberg table could not be parsed by the in-memory catalog table schema. People will need to re-create the in-memory table and map to the underlying table once again. That's the default behavior for flink users, because almost of the flink connectors are the similar behavior ( such as JDBC connector, hive connector, hbase connector).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why share the table name between the in-memory and external catalog but not the database?
I think you are right. I checked the jdbc connector, we could specify a different jdbc table name when creating the flink table. It make sense to allow both db & table name to be overridden.
| TableLoader tableLoader = createTableLoader(objectPath); | ||
| ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); | ||
| Map<String, String> tableProps = context.getCatalogTable().getOptions(); | ||
| CatalogTable catalogTable = context.getCatalogTable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is catalogTable set in the context? Is it possible that it is set and the factory's context is non-null? In that case, should an error be thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are talking about the issue : Is it possible that we access the catalogTable by context.getCatalogTable() before setting it inside the context, if so then the catalogTable will be a null object.
I think we don't have to concern this issue because the catalogTable was set in the context contructor here. And in the flink code path, the set catalogTable must not be null. So I think it's OK here :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant is there a case when context.getCatalogTable() is non-null but passed to a source where catalog is set in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got your question, yes, it's possible in your case. For example, when we create a table under the iceberg catalog in flink sql, then it will create the FlinkDynamicTableFactory with the specified iceberg catalog.
There should be no problem in that case. the catalogTable is the correct flink table which is parsed from SQL such as INSERT INTO iceberg_catalog.iceberg_db.iceberg_table , although we don't use it in the current code path.
| try { | ||
| flinkCatalog.createDatabase(catalogDatabase, new CatalogDatabaseImpl(Maps.newHashMap(), null), true); | ||
| } catch (DatabaseAlreadyExistException | CatalogException e) { | ||
| throw new RuntimeException(String.format("Failed to create database %s.%s", catalogName, catalogDatabase), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using Iceberg's exceptions here, like AlreadyExistsException, instead of RuntimeException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that' looks pretty good to me !
| try { | ||
| flinkCatalog.createTable(objectPath, catalogTable, true); | ||
| } catch (TableAlreadyExistException | CatalogException e) { | ||
| throw new RuntimeException(String.format("Failed to create table %s.%s", catalogName, objectPath), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. I think it is better to throw Iceberg exceptions if they exist.
| } | ||
|
|
||
| @Test | ||
| public void testHadoop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to have tests for when the database option and the one from the identifier conflict.
| private final boolean isStreaming; | ||
| private volatile TableEnvironment tEnv; | ||
|
|
||
| @Parameterized.Parameters(name = "isStreaming={0}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the catalog be a parameter as well? We do that in Spark tests and it works well.
|
|
||
| // Drop and create it again. | ||
| sql("DROP TABLE %s", TABLE_NAME); | ||
| sql("CREATE TABLE %s (id BIGINT, data STRING) WITH %s", TABLE_NAME, toWithClause(tableProps)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to omit the schema for cases where the underlying table already exists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the flink sql don't provide the syntax to support it now, although I agree it's a better user-experience . Currently, the flink sql provide the CREATE TABLE a LIKE b, but it still require that the table b is a flink catalog table.
…atabase from table properties
|
Ping @rdblue , How is your feeling about this PR now ? |
|
@openinx, I'll take another look at this. Is there a description of the proposed behavior for these tables in the Flink catalog? |
Yes, the default in-memory flink catalog will maintain those tables inside it, once people drop those tables from the in-memory flink catalog. The in-memory flink catalog will don't see the tables, but the underlying iceberg tables in file system are still there. If people want to drop those data from tables, the correct way is to drop tables from the backend catalog. |
|
I think this PR has been fully reviewed. Seeem this useful feature has been blocked for so long, I plan to get this merged into the offical apache iceberg repo. If people have more comments or improvements, pls file new issue or PR to address the following thing. Thanks all for reviewing ! |
This PR is trying to address the issue : #2572
For people who want to create an iceberg table under hadoop catalog, could use the following sql:
If want to create an iceberg table under hive catalog, we could use the following SQL:
The flink
CREATE TABLEwith'connector'='iceberg'option is actually trying to mapping the underlying iceberg table to an flink table which managed in flink's in-memory catalogs, so in theory if people execute anALTER TABLEorDROP TABLEclause, it won't affect the underlying iceberg table. For the convenience of use, this PR will try to initialize/create the underlying iceberg table if it does not exist when mapping the flink table from in-memory catalogs to iceberg table, then people don't need to create the real iceberg table before mapping the flink table to it.