Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Sep 16, 2020

It's the document for flink sink connector, pls apply this patch after applied the PR #1423.

This creates an iceberg catalog named `hive_catalog` that loads tables from a hive metastore:

```sql
create catalog hive_catalog with(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a SQL command, right? So we might want to use upper case to distinguish the keywords?

CREATE CATALOG hive_catalog WITH (
  ...
);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we'd better to use upper case for keywords.

);
```

We could execute the sql command `use catalog hive_catalog` to use one of the catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets the current catalog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe I could write this more clear :

We could execute the sql command use catalog hive_catalog to set the current catalog.


### Adding/Removing/Renaming table columns

Iceberg does not support adding/removing/renaming table columns now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these support notes into the table at the top? I think it would be good to have the description for ALTER TABLE note that columns can't be changed with Flink SQL. I'd prefer not to have sections that state what Flink does not support, since we want that easily available in the compatibility table. That also makes it easier to update the docs because we only add sections, we don't have to find and remove them.


Table create commands support most common [flink create clauses](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table) now, including:

* `PARTITION BY (column1, column2, ...)` to configure partitioning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this note that PARTITION BY does not yet support hidden partitioning?


* `PARTITION BY (column1, column2, ...)` to configure partitioning.
* `COMMENT 'table document'` to set a table description.
* `WITH ('key'='value', ...)` to set [table configuration](../configuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these stored in table properties?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, pls see

, will describe it more clear in this document.

Iceberg only support altering table properties in flink 1.11 now.

```sql
ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UNSET supported as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we flink does not yet support UNSET now, it only provide SET syntax. That means we could not remove the table properties from iceberg table, but could set it to the target value. It sounds not so friendly to users but could meet their requirement now. @JingsongLi , we may need to propose apache flink to support UNSET ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/FLINK-19062 There is a ticket for tracking this.


### `INSERT OVERWRITE`

To replace data in the table with the result of a query, use `INSERT OVERWRITE`. Overwrites are atomic operations for Iceberg tables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSERT OVERWRITE can only be used in batch mode, right? Otherwise it doesn't make sense as an operation because you would only ever get 1 checkpoint of data in a partition.

If so, it would be good to have a note that calls it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Pls see the code, we have a precondition check to disable overwriting data in streaming mode.

Will provide the note.

Iceberg also support overwriting given partitions by the `select` values:

```sql
INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation for overwrites doesn't account for hidden partitions.

When all the partition columns are set a value in PARTITION clause, it is inserting into a static partition. . . . When partial partition columns (prefix part of all partition columns) are set a value in PARTITION clause, it is writing the query result into a dynamic partition.

It would be great to clarify how hidden partitions are handled in that documentation, but it definitely needs to be covered here.

I think this should also specifically say that unpartitioned tables are completely overwritten by INSERT OVERWRITE.

Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");

FlinkSink.forRowData(dataStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me fix this typo.


We've created a separate `flink-runtime` module in iceberg project to generate a bundled jar, which could be loaded by flink SQL client directly.

If we want to build the `flink-runtime` bundled jar manually, please just build the `iceberg` project and it will generate the jar under `<iceberg-root-dir>/flink-runtime/build/libs`. Of course, we could also download the `flink-runtime` jar from the [apache official repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink/).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The apache official repository does not provide iceberg-flink-runtime.jar now ,the link is iceberg-flink ,not iceberg-flink-runtime.jar

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected that the iceberg-flink-runtime will be deployed here once we released the 0.10.0 , although we have not accomplished it.

OK, the path should be changed from iceberg-flink to iceberg-flink-runtime .

@zhangjun0x01
Copy link
Contributor

is there requirement for the version of hive?

'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1'
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary to fix this issue #1437 before we get this pr merged, because when i create the hive_catalog, it will try to create the default database but failed to :

Flink SQL> CREATE CATALOG hive_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hive',
>   'uri'='thrift://localhost:9083',
>   'clients'='5',
>   'property-version'='1'
> );
2020-09-24 18:24:21,111 WARN  org.apache.flink.table.client.cli.CliClient                  [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute statement: CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1'
)
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeSql(LocalExecutor.java:362) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.cli.CliClient.callDdl(CliClient.java:642) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.cli.CliClient.callDdl(CliClient.java:637) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:357) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_221]
	at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) [flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.12-1.11.1.jar:1.11.1]
Caused by: java.lang.IllegalArgumentException: Can not create a Path from a null string
	at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159) ~[hadoop-common-2.9.2.jar:?]
	at org.apache.hadoop.fs.Path.<init>(Path.java:175) ~[hadoop-common-2.9.2.jar:?]
	at org.apache.hadoop.fs.Path.<init>(Path.java:110) ~[hadoop-common-2.9.2.jar:?]
	at org.apache.iceberg.hive.HiveCatalog.convertToDatabase(HiveCatalog.java:459) ~[?:?]
	at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$5(HiveCatalog.java:214) ~[?:?]
	at org.apache.iceberg.hive.ClientPool.run(ClientPool.java:54) ~[?:?]
	at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:213) ~[?:?]
	at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:195) ~[?:?]
	at org.apache.iceberg.flink.FlinkCatalog.open(FlinkCatalog.java:116) ~[?:?]
	at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1086) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1019) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) ~[flink-table-blink_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeSql$7(LocalExecutor.java:360) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	at org.apache.flink.table.client.gateway.local.LocalExecutor.executeSql(LocalExecutor.java:360) ~[flink-sql-client_2.12-1.11.1.jar:1.11.1]
	... 8 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not sure about that. Maybe we should not be using the value of hive.metastore.warehouse.dir.

If we are not using hive.metastore.uris, then it doesn't make sense to use the Hive warehouse path, either. Maybe we should support the warehouse config property for Hive catalog. Then we could recommend how to configure the catalog with it and not need to load hive-site.xml.

env.execute("Test Iceberg DataStream");
```

The iceberg API also allows users to write generic `DataStream<T>` to iceberg table, more example could be found in this [unit test](https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that I agree that the API supports writing a generic DataStream<T> because the rows must be converted before they are actually written. Writing a generic T would require a way to control the data model. I think it may be worth adding support to write Iceberg generics, but probably not arbitrary T.

INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
```

For a partitioned iceberg table, when all the partition columns are set a value in `PARTITION` clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in `PARTITION` clause, it is writing the query result into a dynamic partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies from the Flink docs, but doesn't cover what happens with hidden partitioning. I think this should give an example and be specific: hidden partitions are overwritten dynamically.


To replace data in the table with the result of a query, use `INSERT OVERWRITE` in batch job (flink streaming job does not support `INSERT OVERWRITE`). Overwrites are atomic operations for Iceberg tables.

Partitions that have rows produced by the SELECT query will be replaced, for example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this is a good way to state what will happen.

But, the example is incomplete because it doesn't state which partitions would be replaced. I think this needs to have an example table as well as a query. If Flink supports a CASE statement, then I suggest partitioning by "even" and "odd" ids. That's easy to understand and you can show what happens with very few rows.

SET execution.type = streaming

-- Execute the flink job in batch mode for current session context
SET execution.type = batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good place to link to in Flink docs for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* Don't support creating iceberg table with computed column.
* Don't support creating iceberg table with watermark.
* Don't support adding columns, removing columns, renaming columns, changing columns. [FLINK-19062](https://issues.apache.org/jira/browse/FLINK-19062) is tracking this.
* Don't support flink read iceberg table in batch or streaming mode. [#1346](https://github.com/apache/iceberg/pull/1346) and [#1293](https://github.com/apache/iceberg/pull/1293) are tracking this. No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that documentation is a good place for this. If you want, we could reference a tag in the issue tracker, or a milestone. But we don't want to need to update docs when these are done. Compatibility and features should be covered mainly in the table.

@rdblue rdblue merged commit c4f8bc3 into apache:master Oct 1, 2020
@rdblue
Copy link
Contributor

rdblue commented Oct 1, 2020

I'm going to go ahead and merge this so we have something to work from in smaller PRs. Thanks @openinx!

rodmeneses pushed a commit to rodmeneses/iceberg that referenced this pull request Jun 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants