-
Notifications
You must be signed in to change notification settings - Fork 47
Iceberg value_schema_latest mode #1068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
98ea0e3
7efa089
edea27d
b1cd821
a6519d0
bb13c76
bc50015
bdebe7e
7a9d836
b6914f5
a29f2c4
b5e26af
1b50f17
172eebf
756eda7
3e4e2ef
9759e8d
e9c6743
f8b6619
4e3fca9
1136e9e
b0d933c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,231 @@ | ||||||||||
| = Choose an Iceberg Mode | ||||||||||
| :description: Learn about supported Iceberg modes and how you can integrate schemas with Iceberg topics. | ||||||||||
| :page-categories: Iceberg, Tiered Storage, Management, High Availability, Data Replication, Integration | ||||||||||
| :schema-id-val-doc: manage:schema-reg/schema-id-validation.adoc | ||||||||||
| // tag::single-source[] | ||||||||||
|
|
||||||||||
| ifndef::env-cloud[] | ||||||||||
| [NOTE] | ||||||||||
| ==== | ||||||||||
| include::shared:partial$enterprise-license.adoc[] | ||||||||||
| ==== | ||||||||||
| endif::[] | ||||||||||
|
|
||||||||||
| In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of an Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table. | ||||||||||
|
|
||||||||||
| NOTE: The JSON Schema format is not supported for Iceberg topics. If your topic data is in JSON, use the `key_value` mode. | ||||||||||
|
|
||||||||||
| == Supported Iceberg modes | ||||||||||
|
|
||||||||||
| ifndef::env-cloud[] | ||||||||||
| Redpanda supports the following xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[modes] for Iceberg topics: | ||||||||||
| endif::[] | ||||||||||
|
|
||||||||||
| ifdef::env-cloud[] | ||||||||||
| Redpanda supports the following modes for Iceberg topics: | ||||||||||
| endif::[] | ||||||||||
|
|
||||||||||
| === key_value | ||||||||||
|
|
||||||||||
| Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record's value. | ||||||||||
|
|
||||||||||
| === value_schema_id_prefix | ||||||||||
|
|
||||||||||
| Creates an Iceberg table whose structure matches the Redpanda schema for the topic, with columns corresponding to each field. You must register a schema in the xref:manage:schema-reg/schema-reg-overview.adoc[Schema Registry] and producers must write to the topic using the Schema Registry wire format. | ||||||||||
|
|
||||||||||
| In the xref:manage:schema-reg/schema-reg-overview.adoc#serialization-and-deserialization[Schema Registry wire format], a "magic byte" and schema ID are embedded in the message payload header. Producers to the topic must use the wire format in the serialization process so Redpanda can determine the schema used for each record, use the schema to define the Iceberg table, and store the topic values in the corresponding table columns. | ||||||||||
|
||||||||||
|
|
||||||||||
| === value_schema_latest | ||||||||||
|
|
||||||||||
| Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry. Unlike the `value_schema_id_prefix` mode, `value_schema_latest` does not require that producers use the wire format. | ||||||||||
|
|
||||||||||
| The latest schema is cached periodically. The cache period is defined by the cluster property `iceberg_latest_schema_cache_ttl_ms` (default: 5 minutes). | ||||||||||
|
|
||||||||||
| === disabled | ||||||||||
|
|
||||||||||
| Default for `redpanda.iceberg.mode`. Disables writing to an Iceberg table for the topic. | ||||||||||
|
|
||||||||||
| == Configure Iceberg mode for a topic | ||||||||||
|
|
||||||||||
| You can set the Iceberg mode for a topic when you create the topic, or you can update the mode for an existing topic. | ||||||||||
|
|
||||||||||
| .Option 1. Create a new topic and set `redpanda.iceberg.mode`: | ||||||||||
| [,bash] | ||||||||||
| ---- | ||||||||||
| rpk topic create <topic-name> --topic-config=redpanda.iceberg.mode=<iceberg-mode> | ||||||||||
| ---- | ||||||||||
|
|
||||||||||
| .Option 2. Set `redpanda.iceberg.mode` for an existing topic: | ||||||||||
| [,bash] | ||||||||||
| ---- | ||||||||||
| rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<iceberg-mode> | ||||||||||
| ---- | ||||||||||
|
|
||||||||||
| [[override-value-schema-latest-default]] | ||||||||||
| === Override `value_schema_latest` default | ||||||||||
|
|
||||||||||
| ifndef::env-cloud[] | ||||||||||
| In `value_schema_latest` mode, you only need to set the property value to the string `value_schema_latest`. This enables the default behavior of `value_schema_latest` mode, which determines the subject for the topic using the xref:manage:schema-reg/schema-id-validation.adoc#set-subject-name-strategy-per-topic[TopicNameStrategy]. For Protobuf data, the default behavior also deserializes records using the first message defined in the corresponding Protobuf schema stored in the Schema Registry. | ||||||||||
| endif::[] | ||||||||||
|
|
||||||||||
| ifdef::env-cloud[] | ||||||||||
| In `value_schema_latest` mode, you only need to set the property value to the string `value_schema_latest`. This enables the default behavior of `value_schema_latest` mode, which determines the subject for the topic using the TopicNameStrategy. For Protobuf data, the default behavior also deserializes records using the first message defined in the corresponding Protobuf schema stored in the Schema Registry. | ||||||||||
| endif::[] | ||||||||||
|
|
||||||||||
| If you use a different strategy other than the topic name to derive the subject name, you can override the default behavior of `value_schema_latest` mode and explicitly set the subject name. | ||||||||||
|
|
||||||||||
| To override the default behavior, use the following optional syntax: | ||||||||||
kbatuigas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
|
||||||||||
| [,bash] | ||||||||||
| ---- | ||||||||||
| value_schema_latest:subject=<subject-name>,protobuf_name=<protobuf-message-full-name> | ||||||||||
| ---- | ||||||||||
|
|
||||||||||
| * For both Avro and Protobuf, specify a different subject name by using the key-value pair `subject=<subject-name>`, for example `value_schema_latest:subject=sensor-data`. | ||||||||||
| * For Protobuf only: | ||||||||||
| ** Specify a different message definition by using a key-value pair `protobuf_name=<message-name>`, for example: `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`. | ||||||||||
| ** To specify both a different subject and message definition, separate the key-value pairs with a comma, for example: `value_schema_latest:subject=my_protobuf_schema,protobuf_name=com.example.manufacturing.SensorData`. | ||||||||||
|
|
||||||||||
| == How Iceberg modes translate to table format | ||||||||||
kbatuigas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
|
||||||||||
| Redpanda generates an Iceberg table with the same name as the topic. In each mode, Redpanda writes to a `redpanda` table column that stores a single Iceberg https://iceberg.apache.org/spec/#nested-types[struct^] per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset. | ||||||||||
|
|
||||||||||
| For example, if you produce to a topic `ClickEvent` according to the following Avro schema: | ||||||||||
|
|
||||||||||
| [,avro] | ||||||||||
| ---- | ||||||||||
| { | ||||||||||
| "type": "record", | ||||||||||
| "name": "ClickEvent", | ||||||||||
| "fields": [ | ||||||||||
| { | ||||||||||
| "name": "user_id", | ||||||||||
| "type": "int" | ||||||||||
| }, | ||||||||||
| { | ||||||||||
| "name": "event_type", | ||||||||||
| "type": "string" | ||||||||||
| }, | ||||||||||
| { | ||||||||||
| "name": "ts", | ||||||||||
| "type": "string" | ||||||||||
| } | ||||||||||
| ] | ||||||||||
| } | ||||||||||
| ---- | ||||||||||
|
|
||||||||||
| The `key_value` mode writes to the following table format: | ||||||||||
|
|
||||||||||
| [,sql] | ||||||||||
| ---- | ||||||||||
| CREATE TABLE ClickEvent ( | ||||||||||
| redpanda struct< | ||||||||||
| partition: integer NOT NULL, | ||||||||||
| timestamp: timestamp NOT NULL, | ||||||||||
| offset: long NOT NULL, | ||||||||||
| headers: array<struct<key: binary NOT NULL, value: binary>>, | ||||||||||
| key: binary | ||||||||||
| >, | ||||||||||
| value binary | ||||||||||
| ) | ||||||||||
| ---- | ||||||||||
|
|
||||||||||
| Use `key_value` mode if the topic data is in JSON or if you can use the Iceberg data in its semi-structured format. | ||||||||||
|
||||||||||
|
|
||||||||||
| The `value_schema_id_prefix` and `value_schema_latest` modes can use the schema to translate to the following table format: | ||||||||||
|
|
||||||||||
| [,sql] | ||||||||||
| ---- | ||||||||||
| CREATE TABLE ClickEvent ( | ||||||||||
| redpanda struct< | ||||||||||
| partition: integer NOT NULL, | ||||||||||
| timestamp: timestamp NOT NULL, | ||||||||||
| offset: long NOT NULL, | ||||||||||
| headers: array<struct<key: binary NOT NULL, value: binary>>, | ||||||||||
| key: binary | ||||||||||
| >, | ||||||||||
| user_id integer NOT NULL, | ||||||||||
| event_type string, | ||||||||||
| ts string | ||||||||||
| ) | ||||||||||
| ---- | ||||||||||
|
|
||||||||||
| As you produce records to the topic, the data also becomes available in object storage for Iceberg-compatible clients to consume. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database. | ||||||||||
|
|
||||||||||
| If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue] for more information. | ||||||||||
|
|
||||||||||
| === Schema types translation | ||||||||||
|
|
||||||||||
| Redpanda supports direct translations of the following types to Iceberg value domains: | ||||||||||
|
|
||||||||||
| [tabs] | ||||||||||
| ====== | ||||||||||
| Avro:: | ||||||||||
| + | ||||||||||
| -- | ||||||||||
| |=== | ||||||||||
| | Avro type | Iceberg type | ||||||||||
|
|
||||||||||
| | boolean | boolean | ||||||||||
| | int | int | ||||||||||
| | long | long | ||||||||||
| | float | float | ||||||||||
| | double | double | ||||||||||
| | bytes | binary | ||||||||||
| | string | string | ||||||||||
| | record | struct | ||||||||||
| | array | list | ||||||||||
| | maps | list | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix Avro map mapping. The table lists Avro -| maps | list
+| maps | map📝 Committable suggestion
Suggested change
|
||||||||||
| | fixed | fixed | ||||||||||
| | decimal | decimal | ||||||||||
| | uuid | uuid | ||||||||||
| | date | date | ||||||||||
| | time | time | ||||||||||
| | timestamp | timestamp | ||||||||||
| |=== | ||||||||||
|
|
||||||||||
| * Different flavors of time (such as `time-millis`) and timestamp (such as `timestamp-millis`) types are translated to the same Iceberg `time` and `timestamp` types, respectively. | ||||||||||
| * Avro unions are flattened to Iceberg structs with optional fields. For example: | ||||||||||
| ** The union `["int", "long", "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>`. | ||||||||||
| ** The union `["int", null, "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 FLOAT NULLABLE>`. | ||||||||||
| * All fields are required by default. (Avro always sets a default in binary representation.) | ||||||||||
| * The Avro duration logical type is ignored. | ||||||||||
| * The Avro null type is ignored and not represented in the Iceberg schema. | ||||||||||
| * Recursive types are not supported. | ||||||||||
| -- | ||||||||||
|
|
||||||||||
| Protobuf:: | ||||||||||
| + | ||||||||||
| -- | ||||||||||
| |=== | ||||||||||
| | Protobuf type | Iceberg type | ||||||||||
|
|
||||||||||
| | bool | boolean | ||||||||||
| | double | double | ||||||||||
| | float | float | ||||||||||
| | int32 | int | ||||||||||
| | sint32 | int | ||||||||||
| | int64 | long | ||||||||||
| | sint64 | long | ||||||||||
| | sfixed32 | int | ||||||||||
| | sfixed64 | int | ||||||||||
| | string | string | ||||||||||
|
Comment on lines
+211
to
+212
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct Protobuf fixed‑width mapping. Currently -| sfixed64 | int
+| sfixed64 | long📝 Committable suggestion
Suggested change
|
||||||||||
| | bytes | binary | ||||||||||
| | map | map | ||||||||||
| |=== | ||||||||||
|
|
||||||||||
| * Repeated values are translated into Iceberg `array` types. | ||||||||||
| * Enums are translated into Iceberg `int` types based on the integer value of the enumerated type. | ||||||||||
| * `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg. | ||||||||||
| * `uint64` and `fixed64` values are translated into their Base-10 string representation. | ||||||||||
| * The `timestamp` type in Protobuf is translated into `timestamp` in Iceberg. | ||||||||||
| * Messages are converted into Iceberg structs. | ||||||||||
| * Recursive types are not supported. | ||||||||||
| -- | ||||||||||
| ====== | ||||||||||
|
|
||||||||||
| // end::single-source[] | ||||||||||
|
|
||||||||||
| == See also | ||||||||||
|
|
||||||||||
| - xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode` topic property reference] | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ | |
| include::shared:partial$enterprise-license.adoc[] | ||
| ==== | ||
|
|
||
| When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. In either mode, you do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda. | ||
| When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/choose-iceberg-mode.adoc[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. You do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kbatuigas this page has examples using the other mode but no mention of "value_schema_latest" mode at all. Even if we don't have an example it's probably worth a mention how querying works in this mode (essentially the same as with value_schema_id_prefix) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added note |
||
|
|
||
| include::manage:partial$iceberg/query-iceberg-topics.adoc[] | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| // tag::single-source[] | ||
| :description: Redpanda's Schema Registry provides the interface to store and manage event schemas. | ||
|
|
||
| In Redpanda, the messages exchanged between producers and consumers contain raw bytes. Schemas enable producers and consumers to share the information needed to serialize and deserialize those messages. They register and retrieve the schemas they use in the Schema Registry to ensure data verification. | ||
| In Redpanda, the messages exchanged between producers and consumers contain raw bytes. Schemas enable producers and consumers to share the information needed to <<serialization-and-deserialization,serialize and deserialize>> those messages. They register and retrieve the schemas they use in the Schema Registry to ensure data verification. | ||
|
|
||
| Schemas are versioned, and the registry supports configurable compatibility modes between schema versions. When a producer or a consumer requests to register a schema change, the registry checks for schema compatibility and returns an error for an incompatible change. Compatibility modes can ensure that data flowing through a system is well-structured and easily evolves. | ||
|
|
||
|
|
@@ -33,7 +33,39 @@ See the xref:reference:cluster-properties.adoc#kafka_nodelete_topics[kafka_nodel | |
|
|
||
| endif::[] | ||
|
|
||
| Redpanda Schema Registry uses the default port 8081. | ||
| Redpanda Schema Registry uses the default port 8081. | ||
|
|
||
| == Serialization and deserialization | ||
|
||
|
|
||
| With Schema Registry, producers and consumers can use a specific message format, called the wire format. The wire format facilitates a seamless transfer of data by ensuring that clients easily access the correct schema in the Schema Registry for a message. | ||
|
|
||
| The wire format is a sequence of bytes consisting of the following: | ||
|
|
||
| . The "magic byte," a single byte that always contains the value of 0. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh good--you defined it here. thx |
||
| . A four-byte integer containing the schema ID. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically for protobuf there is additionally a series of variants as well encoding which protobuf message in the protobuf schema was used. I don't feel strongly about if we need to call that out however. |
||
| . The rest of the serialized message. | ||
|
|
||
| image::shared:schema-registry-wire-format.png[alt="Schema Registry wire format"] | ||
|
|
||
| ifndef::env-cloud[] | ||
| In the serialization process, the producer hands over the message to a key/value serializer that is part of the respective language-specific SDK. The serializer first checks whether the schema ID for the given subject exists in the local schema cache. The serializer derives the subject name based on several xref:manage:schema-reg/schema-id-validation.adoc#set-subject-name-strategy-per-topic[strategies], such as the topic name. You can also explicitly set the subject name. | ||
| endif::[] | ||
|
|
||
| ifdef::env-cloud[] | ||
| In the serialization process, the producer hands over the message to a key/value serializer that is part of the respective language-specific SDK. The serializer first checks whether the schema ID for the given subject exists in the local schema cache. The serializer derives the subject name based on several strategies, such as the topic name. You can also explicitly set the subject name. | ||
| endif::[] | ||
|
|
||
| If the schema ID isn’t in the cache, the serializer registers the schema in the Schema Registry and collects the resulting schema ID in the response. | ||
|
|
||
| In either case, when the serializer has the schema ID, it pads the beginning of the message with the magic byte and the encoded schema ID, and returns the byte sequence to the producer to write to the topic. | ||
kbatuigas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| In the deserialization process, the consumer fetches messages from the broker and hands them over to a deserializer. The deserializer first checks the presence of the magic byte and rejects the message if it doesn't follow the wire format. | ||
|
|
||
| The deserializer then reads the schema ID and checks whether that schema exists in its local cache. If it finds the schema, it deserializes the message according to that schema. Otherwise, the deserializer retrieves the schema from the Schema Registry using the schema ID, then the deserializer proceeds with deserialization. | ||
|
|
||
| ifndef::env-cloud[] | ||
| See also: xref:manage:schema-reg/schema-id-validation.adoc[] | ||
kbatuigas marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| endif::[] | ||
|
|
||
| == Schema examples | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Link snapshot expiry to its documentation anchor.
The bullet for Snapshot expiry is missing a cross-reference. Please add an
xrefto the corresponding section inabout-iceberg-topics.adoc(e.g.,#snapshot-expiry) for consistency with the other bullets.