Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
98ea0e3
Add value_subject_latest mode
kbatuigas Apr 8, 2025
7efa089
Standardize Iceberg topic properties with the rest of the page
kbatuigas Apr 8, 2025
edea27d
Fix xref
kbatuigas Apr 8, 2025
b1cd821
Clarify that key_value doesn't use a schema
kbatuigas Apr 8, 2025
a6519d0
Style edit per automated review
kbatuigas Apr 8, 2025
bb13c76
Name change
kbatuigas Apr 8, 2025
bc50015
Refactor Iceberg mode content
kbatuigas Apr 9, 2025
bdebe7e
Cross-reference new page
kbatuigas Apr 9, 2025
7a9d836
Refactor Iceberg mode and partitioning property
kbatuigas Apr 9, 2025
b6914f5
Add standalone doc to nav tree
kbatuigas Apr 9, 2025
a29f2c4
Edits for clarity
kbatuigas Apr 9, 2025
b5e26af
Rephrase explicit table creation
kbatuigas Apr 9, 2025
1b50f17
Merge branch 'main' into DOC-1140-Document-value_latest_schema-mode
kbatuigas Apr 14, 2025
172eebf
Add wire format to Schema Reg docs and apply review suggestions
kbatuigas Apr 16, 2025
756eda7
Move schema types translation to new subsection in Modes doc
kbatuigas Apr 16, 2025
3e4e2ef
Add more cross references
kbatuigas Apr 16, 2025
9759e8d
Update modules/manage/pages/iceberg/choose-iceberg-mode.adoc
kbatuigas Apr 16, 2025
e9c6743
Example of TopicNameStrategy
kbatuigas Apr 16, 2025
f8b6619
Additional edits per SME review
kbatuigas Apr 17, 2025
4e3fca9
Apply suggestions from review
kbatuigas Apr 18, 2025
1136e9e
Add note about querying in value_schema_latest mode
kbatuigas Apr 18, 2025
b0d933c
Apply suggestions from automated review
kbatuigas Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
*** xref:manage:whole-cluster-restore.adoc[Whole Cluster Restore]
** xref:manage:iceberg/index.adoc[Iceberg]
*** xref:manage:iceberg/about-iceberg-topics.adoc[About Iceberg Topics]
*** xref:manage:iceberg/choose-iceberg-mode.adoc[Choose Iceberg Mode]
*** xref:manage:iceberg/use-iceberg-catalogs.adoc[Use Iceberg Catalogs]
*** xref:manage:iceberg/query-iceberg-topics.adoc[Query Iceberg Topics]
*** xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[Query Iceberg Topics with Snowflake]
Expand Down
8 changes: 7 additions & 1 deletion modules/get-started/pages/release-notes/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ The admin panel has been removed from the Redpanda Console UI. To manage users,

== Iceberg improvements

xref:manage:iceberg/about-iceberg-topics.adoc[Iceberg-enabled topics] now support custom partitioning for improved query performance, snapshot expiry, and a dead-letter queue for invalid records. Schema evolution is also supported with schema mutations implemented according to the Iceberg specification.
Iceberg-enabled topics now support the following:

- xref:manage:iceberg/about-iceberg-topics.adoc#use-custom-partitioning[Custom partitioning] for improved query performance.
- Snapshot expiry.
- xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Dead-letter queue] for invalid records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Link snapshot expiry to its documentation anchor.

The bullet for Snapshot expiry is missing a cross-reference. Please add an xref to the corresponding section in about-iceberg-topics.adoc (e.g., #snapshot-expiry) for consistency with the other bullets.

- xref:manage:iceberg/about-iceberg-topics.adoc#schema-evolution[Schema evolution], with schema mutations implemented according to the Iceberg specification.
- For Avro and Protobuf data, structured Iceberg tables without the use of the Schema Registry wire format or SerDes. See xref:manage:iceberg/choose-iceberg-mode.adoc[] for more information.

== Protobuf normalization in Schema Registry

Expand Down
231 changes: 231 additions & 0 deletions modules/manage/pages/iceberg/choose-iceberg-mode.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
= Choose an Iceberg Mode
:description: Learn about supported Iceberg modes and how you can integrate schemas with Iceberg topics.
:page-categories: Iceberg, Tiered Storage, Management, High Availability, Data Replication, Integration
:schema-id-val-doc: manage:schema-reg/schema-id-validation.adoc
// tag::single-source[]

ifndef::env-cloud[]
[NOTE]
====
include::shared:partial$enterprise-license.adoc[]
====
endif::[]

In xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg-enabled clusters], the `redpanda.iceberg.mode` topic property determines how Redpanda maps topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of an Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table.

NOTE: The JSON Schema format is not supported for Iceberg topics. If your topic data is in JSON, use the `key_value` mode.

== Supported Iceberg modes

ifndef::env-cloud[]
Redpanda supports the following xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[modes] for Iceberg topics:
endif::[]

ifdef::env-cloud[]
Redpanda supports the following modes for Iceberg topics:
endif::[]

=== key_value

Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record's value.

=== value_schema_id_prefix

Creates an Iceberg table whose structure matches the Redpanda schema for the topic, with columns corresponding to each field. You must register a schema in the xref:manage:schema-reg/schema-reg-overview.adoc[Schema Registry] and producers must write to the topic using the Schema Registry wire format.

In the xref:manage:schema-reg/schema-reg-overview.adoc#serialization-and-deserialization[Schema Registry wire format], a "magic byte" and schema ID are embedded in the message payload header. Producers to the topic must use the wire format in the serialization process so Redpanda can determine the schema used for each record, use the schema to define the Iceberg table, and store the topic values in the corresponding table columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no def/link for "magic byte"?


=== value_schema_latest

Creates an Iceberg table whose structure matches the latest schema registered for the subject in the Schema Registry. You must register a schema in the Schema Registry. Unlike the `value_schema_id_prefix` mode, `value_schema_latest` does not require that producers use the wire format.

The latest schema is cached periodically. The cache period is defined by the cluster property `iceberg_latest_schema_cache_ttl_ms` (default: 5 minutes).

=== disabled

Default for `redpanda.iceberg.mode`. Disables writing to an Iceberg table for the topic.

== Configure Iceberg mode for a topic

You can set the Iceberg mode for a topic when you create the topic, or you can update the mode for an existing topic.

.Option 1. Create a new topic and set `redpanda.iceberg.mode`:
[,bash]
----
rpk topic create <topic-name> --topic-config=redpanda.iceberg.mode=<iceberg-mode>
----

.Option 2. Set `redpanda.iceberg.mode` for an existing topic:
[,bash]
----
rpk topic alter-config <new-topic-name> --set redpanda.iceberg.mode=<iceberg-mode>
----

[[override-value-schema-latest-default]]
=== Override `value_schema_latest` default

ifndef::env-cloud[]
In `value_schema_latest` mode, you only need to set the property value to the string `value_schema_latest`. This enables the default behavior of `value_schema_latest` mode, which determines the subject for the topic using the xref:manage:schema-reg/schema-id-validation.adoc#set-subject-name-strategy-per-topic[TopicNameStrategy]. For example, if your topic is named `sensor` the schema is looked up in the `sensor-value` subject. For Protobuf data, the default behavior also deserializes records using the first message defined in the corresponding Protobuf schema stored in the Schema Registry.
endif::[]

ifdef::env-cloud[]
In `value_schema_latest` mode, you only need to set the property value to the string `value_schema_latest`. This enables the default behavior of `value_schema_latest` mode, which determines the subject for the topic using the TopicNameStrategy. For example, if your topic is named `sensor` the schema is looked up in the `sensor-value` subject. For Protobuf data, the default behavior also deserializes records using the first message defined in the corresponding Protobuf schema stored in the Schema Registry.
endif::[]

If you use a different strategy other than the topic name to derive the subject name, you can override the default behavior of `value_schema_latest` mode and explicitly set the subject name.

To override the default behavior, use the following optional syntax:

[,bash]
----
value_schema_latest:subject=<subject-name>,protobuf_name=<protobuf-message-full-name>
----

* For both Avro and Protobuf, specify a different subject name by using the key-value pair `subject=<subject-name>`, for example `value_schema_latest:subject=sensor-data`.
* For Protobuf only:
** Specify a different message definition by using a key-value pair `protobuf_name=<message-name>`, for example: `value_schema_latest:protobuf_name=com.example.manufacturing.SensorData`.
** To specify both a different subject and message definition, separate the key-value pairs with a comma, for example: `value_schema_latest:subject=my_protobuf_schema,protobuf_name=com.example.manufacturing.SensorData`.

== How Iceberg modes translate to table format

Redpanda generates an Iceberg table with the same name as the topic. In each mode, Redpanda writes to a `redpanda` table column that stores a single Iceberg https://iceberg.apache.org/spec/#nested-types[struct^] per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset.

For example, if you produce to a topic `ClickEvent` according to the following Avro schema:

[,avro]
----
{
"type": "record",
"name": "ClickEvent",
"fields": [
{
"name": "user_id",
"type": "int"
},
{
"name": "event_type",
"type": "string"
},
{
"name": "ts",
"type": "string"
}
]
}
----

The `key_value` mode writes to the following table format:

[,sql]
----
CREATE TABLE ClickEvent (
redpanda struct<
partition: integer NOT NULL,
timestamp: timestamp NOT NULL,
offset: long NOT NULL,
headers: array<struct<key: binary NOT NULL, value: binary>>,
key: binary
>,
value binary
)
----

Use `key_value` mode if the topic data is in JSON or if you can use the Iceberg data in its semi-structured format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to read this sentence 3-4 times, and am not 100% clear on its meaning.
Use key_value mode if the topic data is in JSON, or if you can, use the Iceberg data in its semi-structured format.
Use key_value mode if the topic data is in JSON, or the Iceberg data in its semi-structured format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased


The `value_schema_id_prefix` and `value_schema_latest` modes can use the schema to translate to the following table format:

[,sql]
----
CREATE TABLE ClickEvent (
redpanda struct<
partition: integer NOT NULL,
timestamp: timestamp NOT NULL,
offset: long NOT NULL,
headers: array<struct<key: binary NOT NULL, value: binary>>,
key: binary
>,
user_id integer NOT NULL,
event_type string,
ts string
)
----

As you produce records to the topic, the data also becomes available in object storage for Iceberg-compatible clients to consume. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database.

If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See xref:manage:iceberg/about-iceberg-topics.adoc#manage-dead-letter-queue[Manage dead-letter queue] for more information.

=== Schema types translation

Redpanda supports direct translations of the following types to Iceberg value domains:

[tabs]
======
Avro::
+
--
|===
| Avro type | Iceberg type
| boolean | boolean
| int | int
| long | long
| float | float
| double | double
| bytes | binary
| string | string
| record | struct
| array | list
| maps | list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix Avro map mapping.

The table lists Avro maps → Iceberg list, but Iceberg supports native map types. Please update to:

-| maps    | list
+| maps    | map
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| maps | list
| maps | map

| fixed | fixed
| decimal | decimal
| uuid | uuid
| date | date
| time | time
| timestamp | timestamp
|===
* Different flavors of time (such as `time-millis`) and timestamp (such as `timestamp-millis`) types are translated to the same Iceberg `time` and `timestamp` types, respectively.
* Avro unions are flattened to Iceberg structs with optional fields. For example:
** The union `["int", "long", "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>`.
** The union `["int", null, "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 FLOAT NULLABLE>`.
* All fields are required by default. (Avro always sets a default in binary representation.)
* The Avro duration logical type is ignored.
* The Avro null type is ignored and not represented in the Iceberg schema.
* Recursive types are not supported.
--
Protobuf::
+
--
|===
| Protobuf type | Iceberg type
| bool | boolean
| double | double
| float | float
| int32 | int
| sint32 | int
| int64 | long
| sint64 | long
| sfixed32 | int
| sfixed64 | int
| string | string
Comment on lines +211 to +212
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct Protobuf fixed‑width mapping.

Currently sfixed64 is mapped to int; it should map to long.

-| sfixed64 | int
+| sfixed64 | long
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| sfixed64 | int
| string | string
| sfixed64 | long
| string | string

| bytes | binary
| map | map
|===
* Repeated values are translated into Iceberg `array` types.
* Enums are translated into Iceberg `int` types based on the integer value of the enumerated type.
* `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg.
* `uint64` and `fixed64` values are translated into their Base-10 string representation.
* The `timestamp` type in Protobuf is translated into `timestamp` in Iceberg.
* Messages are converted into Iceberg structs.
* Recursive types are not supported.
--
======

// end::single-source[]

== See also

- xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode` topic property reference]
2 changes: 1 addition & 1 deletion modules/manage/pages/iceberg/query-iceberg-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
include::shared:partial$enterprise-license.adoc[]
====

When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/about-iceberg-topics.adoc#enable-iceberg-integration[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. In either mode, you do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda.
When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/choose-iceberg-mode.adoc[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. You do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbatuigas this page has examples using the other mode but no mention of "value_schema_latest" mode at all. Even if we don't have an example it's probably worth a mention how querying works in this mode (essentially the same as with value_schema_id_prefix)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added note


include::manage:partial$iceberg/query-iceberg-topics.adoc[]

Expand Down
36 changes: 34 additions & 2 deletions modules/manage/pages/schema-reg/schema-reg-overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// tag::single-source[]
:description: Redpanda's Schema Registry provides the interface to store and manage event schemas.

In Redpanda, the messages exchanged between producers and consumers contain raw bytes. Schemas enable producers and consumers to share the information needed to serialize and deserialize those messages. They register and retrieve the schemas they use in the Schema Registry to ensure data verification.
In Redpanda, the messages exchanged between producers and consumers contain raw bytes. Schemas enable producers and consumers to share the information needed to <<serialization-and-deserialization,serialize and deserialize>> those messages. They register and retrieve the schemas they use in the Schema Registry to ensure data verification.

Schemas are versioned, and the registry supports configurable compatibility modes between schema versions. When a producer or a consumer requests to register a schema change, the registry checks for schema compatibility and returns an error for an incompatible change. Compatibility modes can ensure that data flowing through a system is well-structured and easily evolves.

Expand Down Expand Up @@ -33,7 +33,39 @@ See the xref:reference:cluster-properties.adoc#kafka_nodelete_topics[kafka_nodel

endif::[]

Redpanda Schema Registry uses the default port 8081.
Redpanda Schema Registry uses the default port 8081.

== Wire format

With Schema Registry, producers and consumers can use a specific message format, called the wire format. The wire format facilitates a seamless transfer of data by ensuring that clients easily access the correct schema in the Schema Registry for a message.

The wire format is a sequence of bytes consisting of the following:

. The "magic byte," a single byte that always contains the value of 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good--you defined it here. thx

. A four-byte integer containing the schema ID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically for protobuf there is additionally a series of variants as well encoding which protobuf message in the protobuf schema was used. I don't feel strongly about if we need to call that out however.

. The rest of the serialized message.

image::shared:schema-registry-wire-format.png[alt="Schema Registry wire format"]

ifndef::env-cloud[]
In the serialization process, the producer hands over the message to a key/value serializer that is part of the respective language-specific SDK. The serializer first checks whether the schema ID for the given subject exists in the local schema cache. The serializer derives the subject name based on several xref:manage:schema-reg/schema-id-validation.adoc#set-subject-name-strategy-per-topic[strategies], such as the topic name. You can also explicitly set the subject name.
endif::[]

ifdef::env-cloud[]
In the serialization process, the producer hands over the message to a key/value serializer that is part of the respective language-specific SDK. The serializer first checks whether the schema ID for the given subject exists in the local schema cache. The serializer derives the subject name based on several strategies, such as the topic name. You can also explicitly set the subject name.
endif::[]

If the schema ID isn’t in the cache, the serializer registers the schema in the Schema Registry and collects the resulting schema ID in the response.

In either case, when the serializer has the schema ID, it pads the beginning of the message with the magic byte and the encoded schema ID, and returns the byte sequence to the producer to write to the topic.

In the deserialization process, the consumer fetches messages from the broker and hands them over to a deserializer. The deserializer first checks the presence of the magic byte and rejects the message if it doesn't follow the wire format.

The deserializer then reads the schema ID and checks whether that schema exists in its local cache. If it finds the schema, it deserializes the message according to that schema. Otherwise, the deserializer retrieves the schema from the Schema Registry using the schema ID, then the deserializer proceeds with deserialization.

ifndef::env-cloud[]
You can also configure brokers to validate that producers use the wire format and the schema exists (but brokers do not validate the full payload). See xref:manage:schema-reg/schema-id-validation.adoc[] for more information.
endif::[]

== Schema examples

Expand Down
Loading