Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/docs/flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ title: "Flink Connector"
-->

# Flink Connector
Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying `'connector'='iceberg'` table option in Flink SQL which is similar to usage in the Flink official [document](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/).
Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying `'connector'='iceberg'` table option in Flink SQL which is similar to usage in the Flink official [document](https://nightlies.apache.org/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/connectors/table/overview/).

In Flink, the SQL `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)` will create a Flink table in current Flink catalog (use [GenericInMemoryCatalog](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/catalogs/#genericinmemorycatalog) by default),
In Flink, the SQL `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)` will create a Flink table in current Flink catalog (use [GenericInMemoryCatalog](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/table/catalogs/#genericinmemorycatalog) by default),
which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog.

To create the table in Flink SQL by using SQL syntax `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)`, Flink iceberg connector provides the following table properties:
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/flink-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
) WITH ('format-version'='2');
```

Table create commands support the commonly used [Flink create clauses](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/) including:
Table create commands support the commonly used [Flink create clauses](https://nightlies.apache.org/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/table/sql/create/) including:

* `PARTITION BY (column1, column2, ...)` to configure partitioning, Flink does not yet support hidden partitioning.
* `COMMENT 'table document'` to set a table description.
Expand Down Expand Up @@ -195,7 +195,7 @@ CREATE TABLE `hive_catalog`.`default`.`sample` (
CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
```

For more details, refer to the [Flink `CREATE TABLE` documentation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/).
For more details, refer to the [Flink `CREATE TABLE` documentation](https://nightlies.apache.org/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/table/sql/create/).


### `ALTER TABLE`
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/flink-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks()

### Emitting watermarks
Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the
[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
or prevent triggering [windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
or prevent triggering [windows](https://nightlies.apache.org/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/datastream/operators/windows/)
too early when reading multiple data files concurrently.

Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`.
Expand Down
22 changes: 11 additions & 11 deletions docs/docs/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ Apache Iceberg supports both [Apache Flink](https://flink.apache.org/)'s DataStr

## Preparation when using Flink SQL Client

To create Iceberg table in Flink, it is recommended to use [Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html) as it's easier for users to understand the concepts.
To create Iceberg table in Flink, it is recommended to use [Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/dev/table/sqlClient.html) as it's easier for users to understand the concepts.

Download Flink from the [Apache download page](https://flink.apache.org/downloads.html). Iceberg uses Scala 2.12 when compiling the Apache `iceberg-flink-runtime` jar, so it's recommended to use Flink 1.16 bundled with Scala 2.12.
Download Flink from the [Apache download page](https://flink.apache.org/downloads.html). Iceberg uses Scala 2.12 when compiling the Apache `iceberg-flink-runtime` jar, so it's recommended to use Flink {{ flinkVersionMajor }} bundled with Scala 2.12.

```bash
FLINK_VERSION=1.16.2
FLINK_VERSION={{ flinkVersion }}
SCALA_VERSION=2.12
APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
Expand All @@ -69,8 +69,7 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/start-cluster.sh
```

<!-- markdown-link-check-disable-next-line -->
Start the Flink SQL client. There is a separate `flink-runtime` module in the Iceberg project to generate a bundled jar, which could be loaded by Flink SQL client directly. To build the `flink-runtime` bundled jar manually, build the `iceberg` project, and it will generate the jar under `<iceberg-root-dir>/flink-runtime/build/libs`. Or download the `flink-runtime` jar from the [Apache repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/{{ icebergVersion }}/).
Start the Flink SQL client. There is a separate `flink-runtime` module in the Iceberg project to generate a bundled jar, which could be loaded by Flink SQL client directly. To build the `flink-runtime` bundled jar manually, build the `iceberg` project, and it will generate the jar under `<iceberg-root-dir>/flink-runtime/build/libs`. Or download the `flink-runtime` jar from the [Apache repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-{{ flinkVersionMajor }}/{{ icebergVersion }}/).

```bash
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
Expand All @@ -84,7 +83,7 @@ put iceberg-flink-runtime-1.16-{{ icebergVersion }}.jar in flink/lib dir
./bin/sql-client.sh embedded shell
```

By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client. Fortunately, Flink has provided a [bundled hive jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.2/flink-sql-connector-hive-2.3.9_2.12-1.16.2.jar) for the SQL client. An example on how to download the dependencies and get started:
By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client. Fortunately, Flink has provided a [bundled hive jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/{{ flinkVersion }}/flink-sql-connector-hive-2.3.9_2.12-{{ flinkVersion }}.jar) for the SQL client. An example on how to download the dependencies and get started:

```bash
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
Expand All @@ -94,11 +93,12 @@ ICEBERG_VERSION={{ icebergVersion }}
MAVEN_URL=https://repo1.maven.org/maven2
ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
ICEBERG_PACKAGE=iceberg-flink-runtime
FLINK_VERSION_MAJOR={{ flinkVersionMajor }}
wget ${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}-${ICEBERG_VERSION}.jar -P lib/

HIVE_VERSION=2.3.9
SCALA_VERSION=2.12
FLINK_VERSION=1.16.2
FLINK_VERSION={{ flinkVersion }}
FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar
Expand All @@ -115,23 +115,23 @@ wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_V
Install the Apache Flink dependency using `pip`:

```python
pip install apache-flink==1.16.2
pip install apache-flink=={{ flinkVersion }}
```

Provide a `file://` path to the `iceberg-flink-runtime` jar, which can be obtained by building the project and looking at `<iceberg-root-dir>/flink-runtime/build/libs`, or downloading it from the [Apache official repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/). Third-party jars can be added to `pyflink` via:

- `env.add_jars("file:///my/jar/path/connector.jar")`
- `table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar")`

This is also mentioned in the official [docs](https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/dependency_management/). The example below uses `env.add_jars(..)`:
This is also mentioned in the official [docs](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/python/dependency_management/). The example below uses `env.add_jars(..)`:

```python
import os

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-1.16-{{ icebergVersion }}.jar")
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-{{ flinkVersionMajor }}-{{ icebergVersion }}.jar")

env.add_jars("file://{}".format(iceberg_flink_runtime_jar))
```
Expand Down Expand Up @@ -172,7 +172,7 @@ Run a query:
5 rows in set
```

For more details, please refer to the [Python Table API](https://ci.apache.org/projects/flink/flink-docs-release-1.16/docs/dev/python/table/intro_to_table_api/).
For more details, please refer to the [Python Table API](https://ci.apache.org/projects/flink/flink-docs-release-{{ flinkVersionMajor }}/docs/dev/python/table/intro_to_table_api/).

## Adding catalogs.

Expand Down
2 changes: 2 additions & 0 deletions site/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ markdown_extensions:
extra:
icebergVersion: '1.5.2'
nessieVersion: '0.77.1'
flinkVersion: '1.19.0'
flinkVersionMajor: '1.19'
social:
- icon: fontawesome/regular/comments
link: 'https://iceberg.apache.org/community/'
Expand Down