diff --git a/docs/config.toml b/docs/config.toml index ab0c8f968..64b63c4c4 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -7,6 +7,6 @@ theme= "hugo-book" BookTheme = 'auto' BookLogo = "img/iceberg-logo-icon.png" versions.iceberg = "" # This is populated by the github deploy workflow and is equal to the branch name - versions.nessie = "0.17.0" + versions.nessie = "0.18.0" latestVersions.iceberg = "0.13.0" # This is used for the version badge on the "latest" site version BookSection='docs' # This determines which directory will inform the left navigation menu diff --git a/docs/content/docs/api/java-api.md b/docs/content/docs/api/java-api.md index fb0ed88b7..a7acc3cd3 100644 --- a/docs/content/docs/api/java-api.md +++ b/docs/content/docs/api/java-api.md @@ -176,8 +176,9 @@ StructType struct = Struct.of( ```java // map<1 key: int, 2 value: optional string> MapType map = MapType.ofOptional( - 1, Types.IntegerType.get(), - 2, Types.StringType.get() + 1, 2, + Types.IntegerType.get(), + Types.StringType.get() ) ``` ```java @@ -203,6 +204,7 @@ Supported predicate expressions are: * `in` * `notIn` * `startsWith` +* `notStartsWith` Supported expression operations are: diff --git a/docs/content/docs/dremio/_index.md b/docs/content/docs/dremio/_index.md new file mode 100644 index 000000000..644ab0b0b --- /dev/null +++ b/docs/content/docs/dremio/_index.md @@ -0,0 +1,23 @@ +--- +title: "Dremio" +bookIconImage: ../img/dremio-logo.png +bookFlatSection: true +weight: 430 +bookExternalUrlNewWindow: https://docs.dremio.com/data-formats/apache-iceberg/ +--- + \ No newline at end of file diff --git a/docs/content/docs/flink/flink-getting-started.md b/docs/content/docs/flink/flink-getting-started.md index b4d55182d..ebb2153c0 100644 --- a/docs/content/docs/flink/flink-getting-started.md +++ b/docs/content/docs/flink/flink-getting-started.md @@ -22,25 +22,25 @@ url: flink # Flink -Apache Iceberg supports both [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API to write records into an Iceberg table. Currently, -we only integrate Iceberg with Apache Flink 1.11.x. - -| Feature support | Flink 1.11.0 | Notes | -|------------------------------------------------------------------------|--------------------|--------------------------------------------------------| -| [SQL create catalog](#creating-catalogs-and-using-catalogs) | ✔️ | | -| [SQL create database](#create-database) | ✔️ | | -| [SQL create table](#create-table) | ✔️ | | -| [SQL create table like](#create-table-like) | ✔️ | | -| [SQL alter table](#alter-table) | ✔️ | Only support altering table properties, Columns/PartitionKey changes are not supported now| -| [SQL drop_table](#drop-table) | ✔️ | | -| [SQL select](#querying-with-sql) | ✔️ | Support both streaming and batch mode | -| [SQL insert into](#insert-into) | ✔️ ️ | Support both streaming and batch mode | -| [SQL insert overwrite](#insert-overwrite) | ✔️ ️ | | -| [DataStream read](#reading-with-datastream) | ✔️ ️ | | -| [DataStream append](#appending-data) | ✔️ ️ | | -| [DataStream overwrite](#overwrite-data) | ✔️ ️ | | -| [Metadata tables](#inspecting-tables) | ️ | Support Java API but does not support Flink SQL | -| [Rewrite files action](#rewrite-files-action) | ✔️ ️ | | +Apache Iceberg supports both [Apache Flink](https://flink.apache.org/)'s DataStream API and Table API. Currently, +Iceberg integration for Apache Flink is available for Flink versions 1.12, 1.13, and 1.14. Previous versions of Iceberg also support Flink 1.11. + +| Feature support | Flink | Notes | +| ----------------------------------------------------------- | ----- | ------------------------------------------------------------ | +| [SQL create catalog](#creating-catalogs-and-using-catalogs) | ✔️ | | +| [SQL create database](#create-database) | ✔️ | | +| [SQL create table](#create-table) | ✔️ | | +| [SQL create table like](#create-table-like) | ✔️ | | +| [SQL alter table](#alter-table) | ✔️ | Only support altering table properties, column and partition changes are not supported | +| [SQL drop_table](#drop-table) | ✔️ | | +| [SQL select](#querying-with-sql) | ✔️ | Support both streaming and batch mode | +| [SQL insert into](#insert-into) | ✔️ ️ | Support both streaming and batch mode | +| [SQL insert overwrite](#insert-overwrite) | ✔️ ️ | | +| [DataStream read](#reading-with-datastream) | ✔️ ️ | | +| [DataStream append](#appending-data) | ✔️ ️ | | +| [DataStream overwrite](#overwrite-data) | ✔️ ️ | | +| [Metadata tables](#inspecting-tables) | ️ | Support Java API but does not support Flink SQL | +| [Rewrite files action](#rewrite-files-action) | ✔️ ️ | | ## Preparation when using Flink SQL Client diff --git a/docs/content/docs/hive/_index.md b/docs/content/docs/hive/_index.md index fd6c0a049..cf4712110 100644 --- a/docs/content/docs/hive/_index.md +++ b/docs/content/docs/hive/_index.md @@ -79,6 +79,14 @@ catalog.createTable(tableId, schema, spec, tableProperties); The table level configuration overrides the global Hadoop configuration. +#### Hive on Tez configuration + +To use the Tez engine on Hive `3.1.2` or later, Tez needs to be upgraded to >= `0.10.1` which contains a necessary fix [Tez-4248](https://issues.apache.org/jira/browse/TEZ-4248). + +To use the Tez engine on Hive `2.3.x`, you will need to manually build Tez from the `branch-0.9` branch due to a backwards incompatibility issue with Tez `0.10.1`. + +You will also need to set the following property in the Hive configuration: `tez.mrreader.config.update.properties=hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids`. + ## Catalog Management ### Global Hive catalog diff --git a/docs/content/docs/integrations/aws.md b/docs/content/docs/integrations/aws.md index 0eec0f5c2..9d2c73203 100644 --- a/docs/content/docs/integrations/aws.md +++ b/docs/content/docs/integrations/aws.md @@ -405,6 +405,11 @@ If for any reason you have to use S3A, here are the instructions: 3. Add [hadoop-aws](https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws) as a runtime dependency of your compute engine. 4. Configure AWS settings based on [hadoop-aws documentation](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html) (make sure you check the version, S3A configuration varies a lot based on the version you use). +### S3 Write Checksum Verification + +To ensure integrity of uploaded objects, checksum validations for S3 writes can be turned on by setting catalog property `s3.checksum-enabled` to `true`. +This is turned off by default. + ## AWS Client Customization Many organizations have customized their way of configuring AWS clients with their own credential provider, access proxy, retry strategy, etc. @@ -448,8 +453,10 @@ spark-sql --packages org.apache.iceberg:iceberg-spark3-runtime:{{% icebergVersio [Hive](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hive.html), [Flink](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html), [Trino](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-presto.html) that can run Iceberg. -You can use a [bootstrap action](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-bootstrap.html) similar to the following to pre-install all necessary dependencies: +Starting with EMR version 6.5.0, EMR clusters can be configured to have the necessary Apache Iceberg dependencies installed without requiring bootstrap actions. +Please refer to the [official documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-create-cluster.html) on how to create a cluster with Iceberg installed. +For versions before 6.5.0, you can use a [bootstrap action](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-bootstrap.html) similar to the following to pre-install all necessary dependencies: ```sh #!/bin/bash diff --git a/docs/content/docs/spark/spark-configuration.md b/docs/content/docs/spark/spark-configuration.md index 5962ab030..6afddfbba 100644 --- a/docs/content/docs/spark/spark-configuration.md +++ b/docs/content/docs/spark/spark-configuration.md @@ -67,6 +67,7 @@ Both catalogs are configured using properties nested under the catalog name. Com | spark.sql.catalog._catalog-name_.uri | thrift://host:port | Metastore connect URI; default from `hive-site.xml` | | spark.sql.catalog._catalog-name_.warehouse | hdfs://nn:8020/warehouse/path | Base path for the warehouse directory | | spark.sql.catalog._catalog-name_.cache-enabled | `true` or `false` | Whether to enable catalog cache, default value is `true` | +| spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30 seconds) | Duration after which cached catalog entries are expired; Only effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0` disables caching entirely, irrespective of `cache-enabled`. Default is `30000` (30 seconds) | | Additional properties can be found in common [catalog configuration](../configuration#catalog-properties). @@ -162,6 +163,7 @@ spark.read | file-open-cost | As per table property | Overrides this table's read.split.open-file-cost | | vectorization-enabled | As per table property | Overrides this table's read.parquet.vectorization.enabled | | batch-size | As per table property | Overrides this table's read.parquet.vectorization.batch-size | +| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used | ### Write options diff --git a/docs/content/docs/spark/spark-ddl.md b/docs/content/docs/spark/spark-ddl.md index 55e7bfe29..94b95de26 100644 --- a/docs/content/docs/spark/spark-ddl.md +++ b/docs/content/docs/spark/spark-ddl.md @@ -370,3 +370,31 @@ ALTER TABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NUL {{< hint info >}} Table write order does not guarantee data order for queries. It only affects how data is written to the table. {{< /hint >}} + +`WRITE ORDERED BY` sets a global ordering where rows are ordered across tasks, like using `ORDER BY` in an `INSERT` command: + +```sql +INSERT INTO prod.db.sample +SELECT id, data, category, ts FROM another_table +ORDER BY ts, category +``` + +To order within each task, not across tasks, use `LOCALLY ORDERED BY`: + +```sql +ALTER TABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id +``` + +### `ALTER TABLE ... WRITE DISTRIBUTED BY PARTITION` + +`WRITE DISTRIBUTED BY PARTITION` will request that each partition is handled by one writer, the default implementation is hash distribution. + +```sql +ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION +``` + +`DISTRIBUTED BY PARTITION` and `LOCALLY ORDERED BY` may be used together, to distribute by partition and locally order rows within each task. + +```sql +ALTER TABLE prod.db.sample WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id +``` diff --git a/docs/content/docs/spark/spark-procedures.md b/docs/content/docs/spark/spark-procedures.md index d4649a239..c81137089 100644 --- a/docs/content/docs/spark/spark-procedures.md +++ b/docs/content/docs/spark/spark-procedures.md @@ -55,7 +55,9 @@ Roll back a table to a specific snapshot ID. To roll back to a specific time, use [`rollback_to_timestamp`](#rollback_to_timestamp). -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +{{< hint info >}} +This procedure invalidates all cached Spark plans that reference the affected table. +{{< /hint >}} #### Usage @@ -83,7 +85,9 @@ CALL catalog_name.system.rollback_to_snapshot('db.sample', 1) Roll back a table to the snapshot that was current at some time. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +{{< hint info >}} +This procedure invalidates all cached Spark plans that reference the affected table. +{{< /hint >}} #### Usage @@ -112,7 +116,9 @@ Sets the current snapshot ID for a table. Unlike rollback, the snapshot is not required to be an ancestor of the current table state. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +{{< hint info >}} +This procedure invalidates all cached Spark plans that reference the affected table. +{{< /hint >}} #### Usage @@ -143,7 +149,9 @@ Cherry-picking creates a new snapshot from an existing snapshot without altering Only append and dynamic overwrite snapshots can be cherry-picked. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +{{< hint info >}} +This procedure invalidates all cached Spark plans that reference the affected table. +{{< /hint >}} #### Usage @@ -192,6 +200,9 @@ the `expire_snapshots` procedure will never remove files which are still require | `table` | ✔️ | string | Name of the table to update | | `older_than` | ️ | timestamp | Timestamp before which snapshots will be removed (Default: 5 days ago) | | `retain_last` | | int | Number of ancestor snapshots to preserve regardless of `older_than` (defaults to 1) | +| `max_concurrent_deletes` | | int | Size of the thread pool used for delete file actions (by default, no thread pool is used) | + +If `older_than` and `retain_last` are omitted, the table's [expiration properties](./configuration/#table-behavior-properties) will be used. #### Output @@ -227,6 +238,7 @@ Used to remove files which are not referenced in any metadata files of an Iceber | `older_than` | ️ | timestamp | Remove orphan files created before this timestamp (Defaults to 3 days ago) | | `location` | | string | Directory to look for files in (defaults to the table's location) | | `dry_run` | | boolean | When true, don't actually remove files (defaults to false) | +| `max_concurrent_deletes` | | int | Size of the thread pool used for delete file actions (by default, no thread pool is used) | #### Output @@ -308,7 +320,9 @@ Data files in manifests are sorted by fields in the partition spec. This procedu See the [`RewriteManifestsAction` Javadoc](../../../javadoc/{{% icebergVersion %}}/org/apache/iceberg/actions/RewriteManifestsAction.html) to see more configuration options. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +{{< hint info >}} +This procedure invalidates all cached Spark plans that reference the affected table. +{{< /hint >}} #### Usage @@ -350,11 +364,13 @@ When inserts or overwrites run on the snapshot, new files are placed in the snap When finished testing a snapshot table, clean it up by running `DROP TABLE`. -**Note** Because tables created by `snapshot` are not the sole owners of their data files, they are prohibited from +{{< hint info >}} +Because tables created by `snapshot` are not the sole owners of their data files, they are prohibited from actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's integrity. DELETE statements executed against the original Hive table will remove original data files and the `snapshot` table will no longer be able to access them. +{{< /hint >}} See [`migrate`](#migrate) to replace an existing table with an Iceberg table. diff --git a/docs/content/docs/spark/spark-queries.md b/docs/content/docs/spark/spark-queries.md index 5b5017d74..8f139c539 100644 --- a/docs/content/docs/spark/spark-queries.md +++ b/docs/content/docs/spark/spark-queries.md @@ -116,6 +116,28 @@ in [Spark 3.1 - SPARK-32592](https://issues.apache.org/jira/browse/SPARK-32592). Time travel is not yet supported by Spark's SQL syntax. +### Incremental read + +To read appended data incrementally, use: + +* `start-snapshot-id` Start snapshot ID used in incremental scans (exclusive). +* `end-snapshot-id` End snapshot ID used in incremental scans (inclusive). This is optional. Omitting it will default to the current snapshot. + +```scala +// get the data added after start-snapshot-id (10963874102873L) until end-snapshot-id (63874143573109L) +spark.read() + .format("iceberg") + .option("start-snapshot-id", "10963874102873") + .option("end-snapshot-id", "63874143573109") + .load("path/to/table") +``` + +{{< hint info >}} +Currently gets only the data from `append` operation. Cannot support `replace`, `overwrite`, `delete` operations. +Incremental read works with both V1 and V2 format-version. +Incremental read is not supported by Spark's SQL syntax. +{{< /hint >}} + ### Spark 2.4 Spark 2.4 requires using the DataFrame reader with `iceberg` as a format, because 2.4 does not support direct SQL queries: @@ -223,15 +245,11 @@ To show a table's data files and each file's metadata, run: ```sql SELECT * FROM prod.db.table.files ``` -```text -+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+------------------+-----------------+-----------------+--------------+---------------+ -| file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | -+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+------------------+-----------------+-----------------+--------------+---------------+ -| s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | -| s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | -| s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | -+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+------------------+-----------------+-----------------+--------------+---------------+ -``` +| content | file_path | file_format | spec_id | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id | +| -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | +| 0 | s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 01} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | null | null | +| 0 | s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 02} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | null | null | +| 0 | s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 03} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | null | null | ### Manifests diff --git a/docs/content/docs/spark/spark-structured-streaming.md b/docs/content/docs/spark/spark-structured-streaming.md index 8679847b9..35dbfdc85 100644 --- a/docs/content/docs/spark/spark-structured-streaming.md +++ b/docs/content/docs/spark/spark-structured-streaming.md @@ -32,6 +32,21 @@ As of Spark 3.0, DataFrame reads and writes are supported. |--------------------------------------------------|----------|------------|------------------------------------------------| | [DataFrame write](#streaming-writes) | ✔ | ✔ | | +## Streaming Reads + +Iceberg supports processing incremental data in spark structured streaming jobs which starts from a historical timestamp: + +```scala +val df = spark.readStream + .format("iceberg") + .option("stream-from-timestamp", Long.toString(streamStartTimestamp)) + .load("database.table_name") +``` + +{{< hint warning >}} +Iceberg only supports reading data from append snapshots. Overwrite snapshots cannot be processed and will cause an exception. Similarly, delete snapshots will cause an exception by default, but deletes may be ignored by setting `streaming-skip-delete-snapshots=true`. +{{}} + ## Streaming Writes To write values from streaming query to Iceberg table, use `DataStreamWriter`: diff --git a/docs/content/docs/tables/configuration.md b/docs/content/docs/tables/configuration.md index 78752e8b0..3a856dd30 100644 --- a/docs/content/docs/tables/configuration.md +++ b/docs/content/docs/tables/configuration.md @@ -43,9 +43,9 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size | | write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | -| write.parquet.compression-codec | gzip | Parquet compression codec | +| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | | write.parquet.compression-level | null | Parquet compression level | -| write.avro.compression-codec | gzip | Avro compression codec | +| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), gzip, snappy, uncompressed | | write.location-provider.impl | null | Optional custom implemention for LocationProvider | | write.metadata.compression-codec | none | Metadata compression codec; none or gzip | | write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full | @@ -77,6 +77,14 @@ Iceberg tables support table properties to configure table behavior, like the de | history.expire.min-snapshots-to-keep | 1 | Default min number of snapshots to keep while expiring snapshots | | history.expire.max-ref-age-ms | `Long.MAX_VALUE` (forever) | For snapshot references except the `main` branch, default max age of snapshot references to keep while expiring snapshots. The `main` branch never expires. | +### Reserved table properties +Reserved table properties are only used to control behaviors when creating or updating a table. +The value of these properties are not persisted as a part of the table metadata. + +| Property | Default | Description | +| -------------- | -------- | ------------------------------------------------------------- | +| format-version | 1 | Table's format version (can be 1 or 2) as defined in the [Spec](./spec.md#format-versioning). | + ### Compatibility flags | Property | Default | Description | diff --git a/docs/static/img/dremio-logo.png b/docs/static/img/dremio-logo.png new file mode 100644 index 000000000..dd471aa06 Binary files /dev/null and b/docs/static/img/dremio-logo.png differ