Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions 1.5.0/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ Iceberg tables support table properties to configure table behavior, like the de
Reserved table properties are only used to control behaviors when creating or updating a table.
The value of these properties are not persisted as a part of the table metadata.

| Property | Default | Description |
| -------------- | -------- | ------------------------------------------------------------- |
| format-version | 2 | Table's format version (can be 1 or 2) as defined in the [Spec](../../../spec/#format-versioning). Defaults to 2 since version 1.4.0. |
| Property | Default | Description |
| -------------- | -------- |--------------------------------------------------------------------------------------------------------------------------------------|
| format-version | 2 | Table's format version (can be 1 or 2) as defined in the [Spec](../../spec.md#format-versioning). Defaults to 2 since version 1.4.0. |

### Compatibility flags

Expand All @@ -133,7 +133,7 @@ Iceberg catalogs support using catalog properties to configure catalog behaviors
| clients | 2 | client pool size |
| cache-enabled | true | Whether to cache catalog entries |
| cache.expiration-interval-ms | 30000 | How long catalog entries are locally cached, in milliseconds; 0 disables caching, negative values disable expiration |
| metrics-reporter-impl | org.apache.iceberg.metrics.LoggingMetricsReporter | Custom `MetricsReporter` implementation to use in a catalog. See the [Metrics reporting](../metrics-reporting.md) section for additional details |
| metrics-reporter-impl | org.apache.iceberg.metrics.LoggingMetricsReporter | Custom `MetricsReporter` implementation to use in a catalog. See the [Metrics reporting](metrics-reporting.md) section for additional details |

`HadoopCatalog` and `HiveCatalog` can access the properties in their constructors.
Any other custom catalog can access the properties by implementing `Catalog.initialize(catalogName, catalogProperties)`.
Expand Down
2 changes: 1 addition & 1 deletion 1.5.0/docs/flink-actions.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ search:

## Rewrite files action

Iceberg provides API to rewrite small files into large files by submitting Flink batch jobs. The behavior of this Flink action is the same as Spark's [rewriteDataFiles](../maintenance.md#compact-data-files).
Iceberg provides API to rewrite small files into large files by submitting Flink batch jobs. The behavior of this Flink action is the same as Spark's [rewriteDataFiles](maintenance.md#compact-data-files).

```java
import org.apache.iceberg.flink.actions.Actions;
Expand Down
6 changes: 3 additions & 3 deletions 1.5.0/docs/flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ To create the table in Flink SQL by using SQL syntax `CREATE TABLE test (..) WIT
* `connector`: Use the constant `iceberg`.
* `catalog-name`: User-specified catalog name. It's required because the connector don't have any default value.
* `catalog-type`: `hive` or `hadoop` for built-in catalogs (defaults to `hive`), or left unset for custom catalog implementations using `catalog-impl`.
* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. See also [custom catalog](../flink.md#adding-catalogs) for more details.
* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. See also [custom catalog](flink.md#adding-catalogs) for more details.
* `catalog-database`: The iceberg database name in the backend catalog, use the current flink database name by default.
* `catalog-table`: The iceberg table name in the backend catalog. Default to use the table name in the flink `CREATE TABLE` sentence.

## Table managed in Hive catalog.

Before executing the following SQL, please make sure you've configured the Flink SQL client correctly according to the [quick start documentation](../flink.md).
Before executing the following SQL, please make sure you've configured the Flink SQL client correctly according to the [quick start documentation](flink.md).

The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in iceberg catalog.

Expand Down Expand Up @@ -140,4 +140,4 @@ SELECT * FROM flink_table;
3 rows in set
```

For more details, please refer to the Iceberg [Flink documentation](../flink.md).
For more details, please refer to the Iceberg [Flink documentation](flink.md).
2 changes: 1 addition & 1 deletion 1.5.0/docs/flink-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Table create commands support the commonly used [Flink create clauses](https://n

* `PARTITION BY (column1, column2, ...)` to configure partitioning, Flink does not yet support hidden partitioning.
* `COMMENT 'table document'` to set a table description.
* `WITH ('key'='value', ...)` to set [table configuration](../configuration.md) which will be stored in Iceberg table properties.
* `WITH ('key'='value', ...)` to set [table configuration](configuration.md) which will be stored in Iceberg table properties.

Currently, it does not support computed column and watermark definition etc.

Expand Down
2 changes: 1 addition & 1 deletion 1.5.0/docs/flink-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ SET table.exec.iceberg.use-flip27-source = true;

### Reading branches and tags with SQL
Branch and tags can be read via SQL by specifying options. For more details
refer to [Flink Configuration](../flink-configuration.md#read-options)
refer to [Flink Configuration](flink-configuration.md#read-options)

```sql
--- Read from branch b1
Expand Down
10 changes: 5 additions & 5 deletions 1.5.0/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Iceberg supports `UPSERT` based on the primary key when writing data into v2 tab
) with ('format-version'='2', 'write.upsert.enabled'='true');
```

2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the [primary key](../flink-ddl.md/#primary-key) or [identifier fields](../../spec.md#identifier-field-ids) when creating the table.
2. Enabling `UPSERT` mode using `upsert-enabled` in the [write options](#write-options) provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the [primary key](flink-ddl.md/#primary-key) or [identifier fields](../../spec.md#identifier-field-ids) when creating the table.

```sql
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
Expand Down Expand Up @@ -187,7 +187,7 @@ FlinkSink.builderFor(

### Branch Writes
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`
For more information on branches please refer to [branches](../branching.md).
For more information on branches please refer to [branches](branching.md).
```java
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
Expand Down Expand Up @@ -264,13 +264,13 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```

Check out all the options here: [write-options](../flink-configuration.md#write-options)
Check out all the options here: [write-options](flink-configuration.md#write-options)

## Notes

Flink streaming write jobs rely on snapshot summary to keep the last committed checkpoint ID, and
store uncommitted data as temporary files. Therefore, [expiring snapshots](../maintenance.md#expire-snapshots)
and [deleting orphan files](../maintenance.md#delete-orphan-files) could possibly corrupt
store uncommitted data as temporary files. Therefore, [expiring snapshots](maintenance.md#expire-snapshots)
and [deleting orphan files](maintenance.md#delete-orphan-files) could possibly corrupt
the state of the Flink job. To avoid that, make sure to keep the last snapshot created by the Flink
job (which can be identified by the `flink.job-id` property in the summary), and only delete
orphan files that are old enough.
35 changes: 18 additions & 17 deletions 1.5.0/docs/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ search:

Apache Iceberg supports both [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API. See the [Multi-Engine Support](../../multi-engine-support.md#apache-flink) page for the integration of Apache Flink.

| Feature support | Flink | Notes |
| ----------------------------------------------------------- |-------|----------------------------------------------------------------------------------------|
| [SQL create catalog](../flink-ddl.md#create-catalog) | ✔️ | |
| [SQL create database](../flink-ddl.md#create-database) | ✔️ | |
| [SQL create table](../flink-ddl.md#create-table) | ✔️ | |
| [SQL create table like](../flink-ddl.md#create-table-like) | ✔️ | |
| [SQL alter table](../flink-ddl.md#alter-table) | ✔️ | Only support altering table properties, column and partition changes are not supported |
| [SQL drop_table](../flink-ddl.md#drop-table) | ✔️ | |
| [SQL select](../flink-queries.md#reading-with-sql) | ✔️ | Support both streaming and batch mode |
| [SQL insert into](../flink-writes.md#insert-into) | ✔️ ️ | Support both streaming and batch mode |
| [SQL insert overwrite](../flink-writes.md#insert-overwrite) | ✔️ ️ | |
| [DataStream read](../flink-queries.md#reading-with-datastream) | ✔️ ️ | |
| [DataStream append](../flink-writes.md#appending-data) | ✔️ ️ | |
| [DataStream overwrite](../flink-writes.md#overwrite-data) | ✔️ ️ | |
| [Metadata tables](../flink-queries.md#inspecting-tables) | ✔️ | |
| [Rewrite files action](../flink-actions.md#rewrite-files-action) | ✔️ ️ | |
| Feature support | Flink | Notes |
| -------------------------------------------------------- |-------|----------------------------------------------------------------------------------------|
| [SQL create catalog](flink-ddl.md#create-catalog) | ✔️ | |
| [SQL create database](flink-ddl.md#create-database) | ✔️ | |
| [SQL create table](flink-ddl.md#create-table) | ✔️ | |
| [SQL create table like](flink-ddl.md#create-table-like) | ✔️ | |
| [SQL alter table](flink-ddl.md#alter-table) | ✔️ | Only support altering table properties, column and partition changes are not supported |
| [SQL drop_table](flink-ddl.md#drop-table) | ✔️ | |
| [SQL select](flink-queries.md#reading-with-sql) | ✔️ | Support both streaming and batch mode |
| [SQL insert into](flink-writes.md#insert-into) | ✔️ ️ | Support both streaming and batch mode |
| [SQL insert overwrite](flink-writes.md#insert-overwrite) | ✔️ ️ | |
| [DataStream read](flink-queries.md#reading-with-datastream) | ✔️ ️ | |
| [DataStream append](flink-writes.md#appending-data) | ✔️ ️ | |
| [DataStream overwrite](flink-writes.md#overwrite-data) | ✔️ ️ | |
| [Metadata tables](flink-queries.md#inspecting-tables) | ✔️ | |
| [Rewrite files action](flink-actions.md#rewrite-files-action) | ✔️ ️ | |

## Preparation when using Flink SQL Client

Expand Down Expand Up @@ -71,6 +71,7 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/start-cluster.sh
```

<!-- markdown-link-check-disable-next-line -->
Start the Flink SQL client. There is a separate `flink-runtime` module in the Iceberg project to generate a bundled jar, which could be loaded by Flink SQL client directly. To build the `flink-runtime` bundled jar manually, build the `iceberg` project, and it will generate the jar under `<iceberg-root-dir>/flink-runtime/build/libs`. Or download the `flink-runtime` jar from the [Apache repository](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/{{ icebergVersion }}/).

```bash
Expand Down Expand Up @@ -273,7 +274,7 @@ env.execute("Test Iceberg DataStream");

### Branch Writes
Writing to branches in Iceberg tables is also supported via the `toBranch` API in `FlinkSink`
For more information on branches please refer to [branches](../branching.md).
For more information on branches please refer to [branches](branching.md).
```java
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
Expand Down
4 changes: 2 additions & 2 deletions 1.5.0/docs/spark-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Both catalogs are configured using properties nested under the catalog name. Com
| spark.sql.catalog._catalog-name_.table-default._propertyKey_ | | Default Iceberg table property value for property key _propertyKey_, which will be set on tables created by this catalog if not overridden |
| spark.sql.catalog._catalog-name_.table-override._propertyKey_ | | Enforced Iceberg table property value for property key _propertyKey_, which cannot be overridden by user |

Additional properties can be found in common [catalog configuration](../configuration.md#catalog-properties).
Additional properties can be found in common [catalog configuration](configuration.md#catalog-properties).


### Using catalogs
Expand Down Expand Up @@ -187,7 +187,7 @@ df.write
| fanout-enabled | false | Overrides this table's write.spark.fanout.enabled |
| check-ordering | true | Checks if input schema and table schema are same |
| isolation-level | null | Desired isolation level for Dataframe overwrite operations. `null` => no checks (for idempotent writes), `serializable` => check for concurrent inserts or deletes in destination partitions, `snapshot` => checks for concurrent deletes in destination partitions. |
| validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](../api.md#table-metadata) or [Snapshots table](../spark-queries.md#snapshots). If null, the table's oldest known snapshot is used. |
| validate-from-snapshot-id | null | If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via [Table API](api.md#table-metadata) or [Snapshots table](spark-queries.md#snapshots). If null, the table's oldest known snapshot is used. |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
Expand Down
Loading