Skip to content

Commit e68ee61

Browse files
kbatuigasFeediver1
andauthored
Iceberg value_schema_latest mode (#1068)
Co-authored-by: Joyce Fee <[email protected]>
1 parent 47f76c5 commit e68ee61

File tree

9 files changed

+298
-194
lines changed

9 files changed

+298
-194
lines changed

modules/ROOT/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
*** xref:manage:whole-cluster-restore.adoc[Whole Cluster Restore]
182182
** xref:manage:iceberg/index.adoc[Iceberg]
183183
*** xref:manage:iceberg/about-iceberg-topics.adoc[About Iceberg Topics]
184+
*** xref:manage:iceberg/choose-iceberg-mode.adoc[Choose Iceberg Mode]
184185
*** xref:manage:iceberg/use-iceberg-catalogs.adoc[Use Iceberg Catalogs]
185186
*** xref:manage:iceberg/query-iceberg-topics.adoc[Query Iceberg Topics]
186187
*** xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[Query Iceberg Topics with Snowflake]

modules/get-started/pages/release-notes/redpanda.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,13 @@ The admin panel has been removed from the Redpanda Console UI. To manage users,
6262

6363
== Iceberg improvements
6464

65-
xref:manage:iceberg/about-iceberg-topics.adoc[Iceberg-enabled topics] now support custom partitioning for improved query performance, snapshot expiry, and a dead-letter queue for invalid records. Schema evolution is also supported with schema mutations implemented according to the Iceberg specification.
65+
Iceberg-enabled topics now support the following:
66+
67+
- xref:manage:iceberg/about-iceberg-topics.adoc#use-custom-partitioning[Custom partitioning] for improved query performance.
68+
- xref:manage:iceberg/query-iceberg-topics.adoc#access-iceberg-tables[Snapshot expiry].
69+
- xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Dead-letter queue] for invalid records.
70+
- xref:manage:iceberg/about-iceberg-topics.adoc#schema-evolution[Schema evolution], with schema mutations implemented according to the Iceberg specification.
71+
- For Avro and Protobuf data, structured Iceberg tables without the use of the Schema Registry wire format or SerDes. See xref:manage:iceberg/choose-iceberg-mode.adoc[] for more information.
6672

6773
== Protobuf normalization in Schema Registry
6874

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
= Choose an Iceberg Mode
2+
:description: Learn about supported Iceberg modes and how you can integrate schemas with Iceberg topics.
3+
:page-categories: Iceberg, Tiered Storage, Management, High Availability, Data Replication, Integration
4+
:schema-id-val-doc: manage:schema-reg/schema-id-validation.adoc
5+
// tag::single-source[]
6+
7+
ifndef::env-cloud[]
8+
[NOTE]
9+
====
10+
include::shared:partial$enterprise-license.adoc[]
11+
====
12+
endif::[]
13+
14+
In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of an Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table.
15+
16+
NOTE: The JSON Schema format is not supported for Iceberg topics. If your topic data is in JSON, use the `key_value` mode.
17+
18+
== Supported Iceberg modes
19+
20+
ifndef::env-cloud[]
21+
Redpanda supports the following xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[modes] for Iceberg topics:
22+
endif::[]
23+
24+
ifdef::env-cloud[]
25+
Redpanda supports the following modes for Iceberg topics:
26+
endif::[]
27+
28+
=== key_value
29+
30+
Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record's value.
31+
32+
=== value_schema_id_prefix
33+
34+
Creates an Iceberg table whose structure matches the Redpanda schema for the topic, with columns corresponding to each field. You must register a schema in the xref:manage:schema-reg/schema-reg-overview.adoc[Schema Registry] and producers must write to the topic using the Schema Registry wire format.
35+
36+
In the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Registry wire format], a "magic byte" and schema ID are embedded in the message payload header. Producers to the topic must use the wire format in the serialization process so Redpanda can determine the schema used for each record, use the schema to define the Iceberg table, and store the topic values in the corresponding table columns.
37+
38+
=== value_schema_latest
39+
40+
Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry. Unlike the `value_schema_id_prefix` mode, `value_schema_latest` does not require that producers use the wire format.
41+
42+
The latest schema is cached periodically. The cache period is defined by the cluster property `iceberg_latest_schema_cache_ttl_ms` (default: 5 minutes).
43+
44+
=== disabled
45+
46+
Default for `redpanda.iceberg.mode`. Disables writing to an Iceberg table for the topic.
47+
48+
== Configure Iceberg mode for a topic
49+
50+
You can set the Iceberg mode for a topic when you create the topic, or you can update the mode for an existing topic.
51+
52+
.Option 1. Create a new topic and set `redpanda.iceberg.mode`:
53+
[,bash]
54+
----
55+
rpk topic create <topic-name> --topic-config=redpanda.iceberg.mode=<iceberg-mode>
56+
----
57+
58+
.Option 2. Set `redpanda.iceberg.mode` for an existing topic:
59+
[,bash]
60+
----
61+
rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<iceberg-mode>
62+
----
63+
64+
[[override-value-schema-latest-default]]
65+
=== Override `value_schema_latest` default
66+
67+
ifndef::env-cloud[]
68+
In `value_schema_latest` mode, you only need to set the property value to the string `value_schema_latest`. This enables the default behavior of `value_schema_latest` mode, which determines the subject for the topic using the xref:manage:schema-reg/schema-id-validation.adoc#set-subject-name-strategy-per-topic[TopicNameStrategy]. For example, if your topic is named `sensor` the schema is looked up in the `sensor-value` subject. For Protobuf data, the default behavior also deserializes records using the first message defined in the corresponding Protobuf schema stored in the Schema Registry.
69+
endif::[]
70+
71+
ifdef::env-cloud[]
72+
In `value_schema_latest` mode, you only need to set the property value to the string `value_schema_latest`. This enables the default behavior of `value_schema_latest` mode, which determines the subject for the topic using the TopicNameStrategy. For example, if your topic is named `sensor` the schema is looked up in the `sensor-value` subject. For Protobuf data, the default behavior also deserializes records using the first message defined in the corresponding Protobuf schema stored in the Schema Registry.
73+
endif::[]
74+
75+
If you use a different strategy other than the topic name to derive the subject name, you can override the default behavior of `value_schema_latest` mode and explicitly set the subject name.
76+
77+
To override the default behavior, use the following optional syntax:
78+
79+
[,bash]
80+
----
81+
value_schema_latest:subject=<subject-name>,protobuf_name=<protobuf-message-full-name>
82+
----
83+
84+
* For both Avro and Protobuf, specify a different subject name by using the key-value pair `subject=<subject-name>`, for example `value_schema_latest:subject=sensor-data`.
85+
* For Protobuf only:
86+
** Specify a different message definition by using a key-value pair `protobuf_name=<message-name>`, for example: `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`.
87+
** To specify both a different subject and message definition, separate the key-value pairs with a comma, for example: `value_schema_latest:subject=my_protobuf_schema,protobuf_name=com.example.manufacturing.SensorData`.
88+
89+
== How Iceberg modes translate to table format
90+
91+
Redpanda generates an Iceberg table with the same name as the topic. In each mode, Redpanda writes to a `redpanda` table column that stores a single Iceberg https://iceberg.apache.org/spec/#nested-types[struct^] per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset.
92+
93+
For example, if you produce to a topic `ClickEvent` according to the following Avro schema:
94+
95+
[,avro]
96+
----
97+
{
98+
"type": "record",
99+
"name": "ClickEvent",
100+
"fields": [
101+
{
102+
"name": "user_id",
103+
"type": "int"
104+
},
105+
{
106+
"name": "event_type",
107+
"type": "string"
108+
},
109+
{
110+
"name": "ts",
111+
"type": "string"
112+
}
113+
]
114+
}
115+
----
116+
117+
The `key_value` mode writes to the following table format:
118+
119+
[,sql]
120+
----
121+
CREATE TABLE ClickEvent (
122+
redpanda struct<
123+
partition: integer NOT NULL,
124+
timestamp: timestamp NOT NULL,
125+
offset: long NOT NULL,
126+
headers: array<struct<key: binary NOT NULL, value: binary>>,
127+
key: binary
128+
>,
129+
value binary
130+
)
131+
----
132+
133+
Use `key_value` mode if the topic data is in JSON or if you are able to use the Iceberg data in its semi-structured format.
134+
135+
The `value_schema_id_prefix` and `value_schema_latest` modes can use the schema to translate to the following table format:
136+
137+
[,sql]
138+
----
139+
CREATE TABLE ClickEvent (
140+
redpanda struct<
141+
partition: integer NOT NULL,
142+
timestamp: timestamp NOT NULL,
143+
offset: long NOT NULL,
144+
headers: array<struct<key: binary NOT NULL, value: binary>>,
145+
key: binary
146+
>,
147+
user_id integer NOT NULL,
148+
event_type string,
149+
ts string
150+
)
151+
----
152+
153+
As you produce records to the topic, the data also becomes available in object storage for Iceberg-compatible clients to consume. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database.
154+
155+
If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue] for more information.
156+
157+
=== Schema types translation
158+
159+
Redpanda supports direct translations of the following types to Iceberg value domains:
160+
161+
[tabs]
162+
======
163+
Avro::
164+
+
165+
--
166+
|===
167+
| Avro type | Iceberg type
168+
169+
| boolean | boolean
170+
| int | int
171+
| long | long
172+
| float | float
173+
| double | double
174+
| bytes | binary
175+
| string | string
176+
| record | struct
177+
| array | list
178+
| maps | list
179+
| fixed | fixed
180+
| decimal | decimal
181+
| uuid | uuid
182+
| date | date
183+
| time | time
184+
| timestamp | timestamp
185+
|===
186+
187+
* Different flavors of time (such as `time-millis`) and timestamp (such as `timestamp-millis`) types are translated to the same Iceberg `time` and `timestamp` types, respectively.
188+
* Avro unions are flattened to Iceberg structs with optional fields. For example:
189+
** The union `["int", "long", "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>`.
190+
** The union `["int", null, "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 FLOAT NULLABLE>`.
191+
* All fields are required by default. (Avro always sets a default in binary representation.)
192+
* The Avro duration logical type is ignored.
193+
* The Avro null type is ignored and not represented in the Iceberg schema.
194+
* Recursive types are not supported.
195+
--
196+
197+
Protobuf::
198+
+
199+
--
200+
|===
201+
| Protobuf type | Iceberg type
202+
203+
| bool | boolean
204+
| double | double
205+
| float | float
206+
| int32 | int
207+
| sint32 | int
208+
| int64 | long
209+
| sint64 | long
210+
| sfixed32 | int
211+
| sfixed64 | int
212+
| string | string
213+
| bytes | binary
214+
| map | map
215+
|===
216+
217+
* Repeated values are translated into Iceberg `array` types.
218+
* Enums are translated into Iceberg `int` types based on the integer value of the enumerated type.
219+
* `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg.
220+
* `uint64` and `fixed64` values are translated into their Base-10 string representation.
221+
* The `timestamp` type in Protobuf is translated into `timestamp` in Iceberg.
222+
* Messages are converted into Iceberg structs.
223+
* Recursive types are not supported.
224+
--
225+
======
226+
227+
// end::single-source[]
228+
229+
== See also
230+
231+
- xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode` topic property reference]

modules/manage/pages/iceberg/query-iceberg-topics.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
include::shared:partial$enterprise-license.adoc[]
88
====
99

10-
When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. In either mode, you do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda.
10+
When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/choose-iceberg-mode.adoc[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. You do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda.
1111

1212
include::manage:partial$iceberg/query-iceberg-topics.adoc[]
1313

modules/manage/pages/schema-reg/schema-reg-overview.adoc

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// tag::single-source[]
55
:description: Redpanda's Schema Registry provides the interface to store and manage event schemas.
66

7-
In Redpanda, the messages exchanged between producers and consumers contain raw bytes. Schemas enable producers and consumers to share the information needed to serialize and deserialize those messages. They register and retrieve the schemas they use in the Schema Registry to ensure data verification.
7+
In Redpanda, the messages exchanged between producers and consumers contain raw bytes. Schemas enable producers and consumers to share the information needed to serialize and deserialize those messages. They register and retrieve the schemas they use in the Schema Registry to ensure data verification.
88

99
Schemas are versioned, and the registry supports configurable compatibility modes between schema versions. When a producer or a consumer requests to register a schema change, the registry checks for schema compatibility and returns an error for an incompatible change. Compatibility modes can ensure that data flowing through a system is well-structured and easily evolves.
1010

@@ -33,7 +33,39 @@ See the xref:reference:cluster-properties.adoc#kafka_nodelete_topics[kafka_nodel
3333

3434
endif::[]
3535

36-
Redpanda Schema Registry uses the default port 8081.
36+
Redpanda Schema Registry uses the default port 8081.
37+
38+
== Wire format
39+
40+
With Schema Registry, producers and consumers can use a specific message format, called the wire format. The wire format facilitates a seamless transfer of data by ensuring that clients easily access the correct schema in the Schema Registry for a message.
41+
42+
The wire format is a sequence of bytes consisting of the following:
43+
44+
. The "magic byte," a single byte that always contains the value of 0.
45+
. A four-byte integer containing the schema ID.
46+
. The rest of the serialized message.
47+
48+
image::shared:schema-registry-wire-format.png[alt="Schema Registry wire format"]
49+
50+
ifndef::env-cloud[]
51+
In the serialization process, the producer hands over the message to a key/value serializer that is part of the respective language-specific SDK. The serializer first checks whether the schema ID for the given subject exists in the local schema cache. The serializer derives the subject name based on several xref:manage:schema-reg/schema-id-validation.adoc#set-subject-name-strategy-per-topic[strategies], such as the topic name. You can also explicitly set the subject name.
52+
endif::[]
53+
54+
ifdef::env-cloud[]
55+
In the serialization process, the producer hands over the message to a key/value serializer that is part of the respective language-specific SDK. The serializer first checks whether the schema ID for the given subject exists in the local schema cache. The serializer derives the subject name based on several strategies, such as the topic name. You can also explicitly set the subject name.
56+
endif::[]
57+
58+
If the schema ID isn’t in the cache, the serializer registers the schema in the Schema Registry and collects the resulting schema ID in the response.
59+
60+
In either case, when the serializer has the schema ID, it pads the beginning of the message with the magic byte and the encoded schema ID, and returns the byte sequence to the producer to write to the topic.
61+
62+
In the deserialization process, the consumer fetches messages from the broker and hands them over to a deserializer. The deserializer first checks the presence of the magic byte and rejects the message if it doesn't follow the wire format.
63+
64+
The deserializer then reads the schema ID and checks whether that schema exists in its local cache. If it finds the schema, it deserializes the message according to that schema. Otherwise, the deserializer retrieves the schema from the Schema Registry using the schema ID, then the deserializer proceeds with deserialization.
65+
66+
ifndef::env-cloud[]
67+
You can also configure brokers to validate that producers use the wire format and the schema exists (but brokers do not validate the full payload). See xref:manage:schema-reg/schema-id-validation.adoc[] for more information.
68+
endif::[]
3769

3870
== Schema examples
3971

0 commit comments

Comments
 (0)