From 1471a798dcf5bd9fafdfd22ca708fa171b5e42d3 Mon Sep 17 00:00:00 2001 From: Harish Vishwanath Date: Fri, 9 Jan 2026 10:56:12 -0800 Subject: [PATCH] Use dynamic version linking for Javadoc and generated static includes --- docs/apis/_index.md | 12 +-- docs/configuration/admin-configs.md | 2 +- docs/configuration/broker-configs.md | 2 +- docs/configuration/configuration-providers.md | 8 +- docs/configuration/consumer-configs.md | 2 +- docs/configuration/group-configs.md | 2 +- docs/configuration/kafka-connect-configs.md | 6 +- docs/configuration/kafka-streams-configs.md | 2 +- docs/configuration/mirrormaker-configs.md | 8 +- docs/configuration/producer-configs.md | 2 +- docs/configuration/tiered-storage-configs.md | 4 +- docs/configuration/topic-configs.md | 2 +- docs/design/protocol.md | 8 +- docs/getting-started/upgrade.md | 4 +- docs/kafka-connect/user-guide.md | 4 +- docs/operations/monitoring.md | 6 +- .../streams/developer-guide/app-reset-tool.md | 2 +- .../streams/developer-guide/config-streams.md | 30 +++---- docs/streams/developer-guide/dsl-api.md | 90 +++++++++---------- .../developer-guide/interactive-queries.md | 6 +- docs/streams/developer-guide/processor-api.md | 26 +++--- 21 files changed, 114 insertions(+), 114 deletions(-) diff --git a/docs/apis/_index.md b/docs/apis/_index.md index b5f4760fe4013..428c95f0dd128 100644 --- a/docs/apis/_index.md +++ b/docs/apis/_index.md @@ -40,7 +40,7 @@ Kafka exposes all its functionality over a language-independent protocol which h The Producer API allows applications to send streams of data to topics in the Kafka cluster. -Examples of using the producer are shown in the [javadocs](/43/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html "Kafka 4.3 Javadoc"). +Examples of using the producer are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html "Kafka 4.3 Javadoc"). To use the producer, add the following Maven dependency to your project: @@ -55,7 +55,7 @@ To use the producer, add the following Maven dependency to your project: The Consumer API allows applications to read streams of data from topics in the Kafka cluster. -Examples of using the consumer are shown in the [javadocs](/43/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html "Kafka 4.3 Javadoc"). +Examples of using the consumer are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html "Kafka 4.3 Javadoc"). To use the consumer, add the following Maven dependency to your project: @@ -70,7 +70,7 @@ To use the consumer, add the following Maven dependency to your project: The Share Consumer API enables applications in a share group to cooperatively consume and process data from Kafka topics. -Examples of using the share consumer are shown in the [javadocs](/43/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaShareConsumer.html "Kafka 4.3 Javadoc"). +Examples of using the share consumer are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaShareConsumer.html "Kafka 4.3 Javadoc"). To use the share consumer, add the following Maven dependency to your project: @@ -85,7 +85,7 @@ To use the share consumer, add the following Maven dependency to your project: The [Streams](/43/documentation/streams) API allows transforming streams of data from input topics to output topics. -Examples of using this library are shown in the [javadocs](/43/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html "Kafka 4.3 Javadoc"). +Examples of using this library are shown in the [javadocs](/{version}/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html "Kafka 4.3 Javadoc"). Additional documentation on using the Streams API is available [here](/43/documentation/streams). @@ -115,7 +115,7 @@ The Connect API allows implementing connectors that continually pull from some s Many users of Connect won't need to use this API directly, though, they can use pre-built connectors without needing to write any code. Additional information on using Connect is available [here](/documentation.html#connect). -Those who want to implement custom connectors can see the [javadoc](/43/javadoc/index.html?org/apache/kafka/connect "Kafka 4.3 Javadoc"). +Those who want to implement custom connectors can see the [javadoc](/{version}/javadoc/index.html?org/apache/kafka/connect "Kafka 4.3 Javadoc"). # Admin API @@ -130,4 +130,4 @@ To use the Admin API, add the following Maven dependency to your project: 4.3.0 -For more information about the Admin APIs, see the [javadoc](/43/javadoc/index.html?org/apache/kafka/clients/admin/Admin.html "Kafka 4.3 Javadoc"). +For more information about the Admin APIs, see the [javadoc](/{version}/javadoc/index.html?org/apache/kafka/clients/admin/Admin.html "Kafka 4.3 Javadoc"). diff --git a/docs/configuration/admin-configs.md b/docs/configuration/admin-configs.md index 194362668a176..8ab0d94de79e9 100644 --- a/docs/configuration/admin-configs.md +++ b/docs/configuration/admin-configs.md @@ -26,4 +26,4 @@ type: docs --> -Below is the Kafka Admin client library configuration. {{< include-html file="/static/43/generated/admin_client_config.html" >}} +Below is the Kafka Admin client library configuration. {{< include-html file="/static/{version}/generated/admin_client_config.html" >}} diff --git a/docs/configuration/broker-configs.md b/docs/configuration/broker-configs.md index 4b3a1a2fb978b..86b85f74ac8b8 100644 --- a/docs/configuration/broker-configs.md +++ b/docs/configuration/broker-configs.md @@ -34,7 +34,7 @@ The essential configurations are the following: * `controller.quorum.bootstrap.servers` * `controller.listener.names` -Broker configurations and defaults are discussed in more detail below. {{< include-html file="/static/43/generated/kafka_config.html" >}} +Broker configurations and defaults are discussed in more detail below. {{< include-html file="/static/{version}/generated/kafka_config.html" >}} More details about broker configuration can be found in the scala class `kafka.server.KafkaConfig`. diff --git a/docs/configuration/configuration-providers.md b/docs/configuration/configuration-providers.md index 13a81625e1435..5a6eb24e6a7fe 100644 --- a/docs/configuration/configuration-providers.md +++ b/docs/configuration/configuration-providers.md @@ -30,11 +30,11 @@ Use configuration providers to load configuration data from external sources. Th You have the following options: - * Use a custom provider by creating a class implementing the [`ConfigProvider`](/43/javadoc/org/apache/kafka/common/config/provider/ConfigProvider.html) interface and packaging it into a JAR file. + * Use a custom provider by creating a class implementing the [`ConfigProvider`](/{version}/javadoc/org/apache/kafka/common/config/provider/ConfigProvider.html) interface and packaging it into a JAR file. * Use a built-in provider: - * [`DirectoryConfigProvider`](/43/javadoc/org/apache/kafka/common/config/provider/DirectoryConfigProvider.html) - * [`EnvVarConfigProvider`](/43/javadoc/org/apache/kafka/common/config/provider/EnvVarConfigProvider.html) - * [`FileConfigProvider`](/43/javadoc/org/apache/kafka/common/config/provider/FileConfigProvider.html) + * [`DirectoryConfigProvider`](/{version}/javadoc/org/apache/kafka/common/config/provider/DirectoryConfigProvider.html) + * [`EnvVarConfigProvider`](/{version}/javadoc/org/apache/kafka/common/config/provider/EnvVarConfigProvider.html) + * [`FileConfigProvider`](/{version}/javadoc/org/apache/kafka/common/config/provider/FileConfigProvider.html) diff --git a/docs/configuration/consumer-configs.md b/docs/configuration/consumer-configs.md index 878d5573141d7..0d2db61930e6c 100644 --- a/docs/configuration/consumer-configs.md +++ b/docs/configuration/consumer-configs.md @@ -26,4 +26,4 @@ type: docs --> -Below is the consumer and share consumer configuration: {{< include-html file="/static/43/generated/consumer_config.html" >}} +Below is the consumer and share consumer configuration: {{< include-html file="/static/{version}/generated/consumer_config.html" >}} diff --git a/docs/configuration/group-configs.md b/docs/configuration/group-configs.md index 4937b80e9924a..ac9cd65b28c61 100644 --- a/docs/configuration/group-configs.md +++ b/docs/configuration/group-configs.md @@ -26,4 +26,4 @@ type: docs --> -Below is the group configuration: {{< include-html file="/static/43/generated/group_config.html" >}} +Below is the group configuration: {{< include-html file="/static/{version}/generated/group_config.html" >}} diff --git a/docs/configuration/kafka-connect-configs.md b/docs/configuration/kafka-connect-configs.md index 9e098e9d8c005..8d56f70f5ce6e 100644 --- a/docs/configuration/kafka-connect-configs.md +++ b/docs/configuration/kafka-connect-configs.md @@ -26,12 +26,12 @@ type: docs --> -Below is the Kafka Connect framework configuration. {{< include-html file="/static/43/generated/connect_config.html" >}} +Below is the Kafka Connect framework configuration. {{< include-html file="/static/{version}/generated/connect_config.html" >}} ## Source Connector Configs -Below is the source connector configuration. {{< include-html file="/static/43/generated/source_connector_config.html" >}} +Below is the source connector configuration. {{< include-html file="/static/{version}/generated/source_connector_config.html" >}} ## Sink Connector Configs -Below is the sink connector configuration. {{< include-html file="/static/43/generated/sink_connector_config.html" >}} +Below is the sink connector configuration. {{< include-html file="/static/{version}/generated/sink_connector_config.html" >}} diff --git a/docs/configuration/kafka-streams-configs.md b/docs/configuration/kafka-streams-configs.md index 53b8ddc702251..6c0b656bd7128 100644 --- a/docs/configuration/kafka-streams-configs.md +++ b/docs/configuration/kafka-streams-configs.md @@ -26,4 +26,4 @@ type: docs --> -Below is the Kafka Streams client library configuration. {{< include-html file="/static/43/generated/streams_config.html" >}} +Below is the Kafka Streams client library configuration. {{< include-html file="/static/{version}/generated/streams_config.html" >}} diff --git a/docs/configuration/mirrormaker-configs.md b/docs/configuration/mirrormaker-configs.md index 5577abd1a06ce..ac8c82f91bcb3 100644 --- a/docs/configuration/mirrormaker-configs.md +++ b/docs/configuration/mirrormaker-configs.md @@ -30,16 +30,16 @@ Below is the configuration of the connectors that make up MirrorMaker 2. ## MirrorMaker Common Configs -Below is the common configuration that applies to all three connectors. {{< include-html file="/static/43/generated/mirror_connector_config.html" >}} +Below is the common configuration that applies to all three connectors. {{< include-html file="/static/{version}/generated/mirror_connector_config.html" >}} ## MirrorMaker Source Configs -Below is the configuration of MirrorMaker 2 source connector for replicating topics. {{< include-html file="/static/43/generated/mirror_source_config.html" >}} +Below is the configuration of MirrorMaker 2 source connector for replicating topics. {{< include-html file="/static/{version}/generated/mirror_source_config.html" >}} ## MirrorMaker Checkpoint Configs -Below is the configuration of MirrorMaker 2 checkpoint connector for emitting consumer offset checkpoints. {{< include-html file="/static/43/generated/mirror_checkpoint_config.html" >}} +Below is the configuration of MirrorMaker 2 checkpoint connector for emitting consumer offset checkpoints. {{< include-html file="/static/{version}/generated/mirror_checkpoint_config.html" >}} ## MirrorMaker HeartBeat Configs -Below is the configuration of MirrorMaker 2 heartbeat connector for checking connectivity between connectors and clusters. {{< include-html file="/static/43/generated/mirror_heartbeat_config.html" >}} +Below is the configuration of MirrorMaker 2 heartbeat connector for checking connectivity between connectors and clusters. {{< include-html file="/static/{version}/generated/mirror_heartbeat_config.html" >}} diff --git a/docs/configuration/producer-configs.md b/docs/configuration/producer-configs.md index 5997439796590..589c54d357640 100644 --- a/docs/configuration/producer-configs.md +++ b/docs/configuration/producer-configs.md @@ -26,4 +26,4 @@ type: docs --> -Below is the producer configuration: {{< include-html file="/static/43/generated/producer_config.html" >}} +Below is the producer configuration: {{< include-html file="/static/{version}/generated/producer_config.html" >}} diff --git a/docs/configuration/tiered-storage-configs.md b/docs/configuration/tiered-storage-configs.md index dc9e62a0dd96c..ad1e6f43d06a0 100644 --- a/docs/configuration/tiered-storage-configs.md +++ b/docs/configuration/tiered-storage-configs.md @@ -26,7 +26,7 @@ type: docs --> -Below is the Tiered Storage configuration. {{< include-html file="/static/43/generated/remote_log_manager_config.html" >}} +Below is the Tiered Storage configuration. {{< include-html file="/static/{version}/generated/remote_log_manager_config.html" >}} ## RLMM Configs @@ -34,7 +34,7 @@ Below is the configuration for `TopicBasedRemoteLogMetadataManager`, which is th All configurations here should start with the prefix defined by `remote.log.metadata.manager.impl.prefix`, for example, `rlmm.config.remote.log.metadata.consume.wait.ms`. -{{< include-html file="/static/43/generated/remote_log_metadata_manager_config.html" >}} +{{< include-html file="/static/{version}/generated/remote_log_metadata_manager_config.html" >}} The implementation of `TopicBasedRemoteLogMetadataManager` needs to create admin, producer, and consumer clients for the internal topic `__remote_log_metadata`. diff --git a/docs/configuration/topic-configs.md b/docs/configuration/topic-configs.md index d1f5f9fb644c2..d95936c002370 100644 --- a/docs/configuration/topic-configs.md +++ b/docs/configuration/topic-configs.md @@ -49,4 +49,4 @@ To remove an override you can do $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes -Below is the topic configuration. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override. {{< include-html file="/static/43/generated/topic_config.html" >}} +Below is the topic configuration. The server's default configuration for this property is given under the Server Default Property heading. A given server default config value only applies to a topic if it does not have an explicit topic config override. {{< include-html file="/static/{version}/generated/topic_config.html" >}} diff --git a/docs/design/protocol.md b/docs/design/protocol.md index c2a60e2807b26..31fa16b2a6635 100644 --- a/docs/design/protocol.md +++ b/docs/design/protocol.md @@ -136,7 +136,7 @@ For interoperability with 0.9.0.x clients, the first packet received by the serv The protocol is built out of the following primitive types. -{{< include-html file="/static/43/generated/protocol_types.html" >}} +{{< include-html file="/static/{version}/generated/protocol_types.html" >}} ### Notes on reading the request format grammars @@ -184,13 +184,13 @@ A description of the record batch format can be found [here](/documentation/#rec We use numeric codes to indicate what problem occurred on the server. These can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Here is a table of the error codes currently in use: -{{< include-html file="/static/43/generated/protocol_errors.html" >}} +{{< include-html file="/static/{version}/generated/protocol_errors.html" >}} ### Api Keys The following are the numeric codes that the stable ApiKey in the request can take for each of the below request types. -{{< include-html file="/static/43/generated/protocol_api_keys.html" >}} +{{< include-html file="/static/{version}/generated/protocol_api_keys.html" >}} ## The Messages @@ -204,7 +204,7 @@ The message consists of the header and body: `RequestOrResponseHeader` is the versioned request or response header. `Body` is the message-specific body. -{{< include-html file="/static/43/generated/protocol_messages.html" >}} +{{< include-html file="/static/{version}/generated/protocol_messages.html" >}} ## Some Common Philosophical Questions diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md index 58344a7ebd263..a92666cf72c55 100644 --- a/docs/getting-started/upgrade.md +++ b/docs/getting-started/upgrade.md @@ -196,14 +196,14 @@ Note: Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been remove * The `topics.blacklist` was removed from the `org.apache.kafka.connect.mirror.MirrorSourceConfig` Please use `topics.exclude` instead. * The `groups.blacklist` was removed from the `org.apache.kafka.connect.mirror.MirrorSourceConfig` Please use `groups.exclude` instead. * **Tools** - * The `kafka.common.MessageReader` class was removed. Please use the [`org.apache.kafka.tools.api.RecordReader`](/43/javadoc/org/apache/kafka/tools/api/RecordReader.html) interface to build custom readers for the `kafka-console-producer` tool. + * The `kafka.common.MessageReader` class was removed. Please use the [`org.apache.kafka.tools.api.RecordReader`](/{version}/javadoc/org/apache/kafka/tools/api/RecordReader.html) interface to build custom readers for the `kafka-console-producer` tool. * The `kafka.tools.DefaultMessageFormatter` class was removed. Please use the `org.apache.kafka.tools.consumer.DefaultMessageFormatter` class instead. * The `kafka.tools.LoggingMessageFormatter` class was removed. Please use the `org.apache.kafka.tools.consumer.LoggingMessageFormatter` class instead. * The `kafka.tools.NoOpMessageFormatter` class was removed. Please use the `org.apache.kafka.tools.consumer.NoOpMessageFormatter` class instead. * The `--whitelist` option was removed from the `kafka-console-consumer` command line tool. Please use `--include` instead. * Redirections from the old tools packages have been removed: `kafka.admin.FeatureCommand`, `kafka.tools.ClusterTool`, `kafka.tools.EndToEndLatency`, `kafka.tools.StateChangeLogMerger`, `kafka.tools.StreamsResetter`, `kafka.tools.JmxTool`. * The `--authorizer`, `--authorizer-properties`, and `--zk-tls-config-file` options were removed from the `kafka-acls` command line tool. Please use `--bootstrap-server` or `--bootstrap-controller` instead. - * The `kafka.serializer.Decoder` trait was removed, please use the [`org.apache.kafka.tools.api.Decoder`](/43/javadoc/org/apache/kafka/tools/api/Decoder.html) interface to build custom decoders for the `kafka-dump-log` tool. + * The `kafka.serializer.Decoder` trait was removed, please use the [`org.apache.kafka.tools.api.Decoder`](/{version}/javadoc/org/apache/kafka/tools/api/Decoder.html) interface to build custom decoders for the `kafka-dump-log` tool. * The `kafka.coordinator.group.OffsetsMessageFormatter` class was removed. Please use the `org.apache.kafka.tools.consumer.OffsetsMessageFormatter` class instead. * The `kafka.coordinator.group.GroupMetadataMessageFormatter` class was removed. Please use the `org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter` class instead. * The `kafka.coordinator.transaction.TransactionLogMessageFormatter` class was removed. Please use the `org.apache.kafka.tools.consumer.TransactionLogMessageFormatter` class instead. diff --git a/docs/kafka-connect/user-guide.md b/docs/kafka-connect/user-guide.md index 2dda66a3f96e1..64b6af9f78c87 100644 --- a/docs/kafka-connect/user-guide.md +++ b/docs/kafka-connect/user-guide.md @@ -186,7 +186,7 @@ Several widely-applicable data and routing transformations are included with Kaf Details on how to configure each transformation are listed below: -{{< include-html file="/static/43/generated/connect_transforms.html" >}} +{{< include-html file="/static/{version}/generated/connect_transforms.html" >}} ### Predicates @@ -249,7 +249,7 @@ Kafka Connect includes the following predicates: Details on how to configure each predicate are listed below: -{{< include-html file="/static/43/generated/connect_predicates.html" >}} +{{< include-html file="/static/{version}/generated/connect_predicates.html" >}} ## REST API diff --git a/docs/operations/monitoring.md b/docs/operations/monitoring.md index f05b04c4be0de..746a74ac6bdd7 100644 --- a/docs/operations/monitoring.md +++ b/docs/operations/monitoring.md @@ -3312,7 +3312,7 @@ kafka.producer:type=producer-metrics,client-id=([-.\w]+) ### Producer Sender Metrics -{{< include-html file="/static/43/generated/producer_metrics.html" >}} +{{< include-html file="/static/{version}/generated/producer_metrics.html" >}} ## Consumer monitoring @@ -3832,11 +3832,11 @@ kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) ### Consumer Fetch Metrics -{{< include-html file="/static/43/generated/consumer_metrics.html" >}} +{{< include-html file="/static/{version}/generated/consumer_metrics.html" >}} ## Connect Monitoring -A Connect worker process contains all the producer and consumer metrics as well as metrics specific to Connect. The worker process itself has a number of metrics, while each connector and task have additional metrics. {{< include-html file="/static/43/generated/connect_metrics.html" >}} +A Connect worker process contains all the producer and consumer metrics as well as metrics specific to Connect. The worker process itself has a number of metrics, while each connector and task have additional metrics. {{< include-html file="/static/{version}/generated/connect_metrics.html" >}} ## Streams Monitoring diff --git a/docs/streams/developer-guide/app-reset-tool.md b/docs/streams/developer-guide/app-reset-tool.md index bc69cff10c808..9a466fc14e540 100644 --- a/docs/streams/developer-guide/app-reset-tool.md +++ b/docs/streams/developer-guide/app-reset-tool.md @@ -132,7 +132,7 @@ All the other parameters can be combined as needed. For example, if you want to For a complete application reset, you must delete the application's local state directory on any machines where the application instance was run. You must do this before restarting an application instance on the same machine. You can use either of these methods: * The API method `KafkaStreams#cleanUp()` in your application code. - * Manually delete the corresponding local state directory (default location: `/${java.io.tmpdir}/kafka-streams/`). For more information, see [Streams](/43/javadoc/org/apache/kafka/streams/StreamsConfig.html#STATE_DIR_CONFIG) javadocs. + * Manually delete the corresponding local state directory (default location: `/${java.io.tmpdir}/kafka-streams/`). For more information, see [Streams](/{version}/javadoc/org/apache/kafka/streams/StreamsConfig.html#STATE_DIR_CONFIG) javadocs. diff --git a/docs/streams/developer-guide/config-streams.md b/docs/streams/developer-guide/config-streams.md index 2f9ec6ca12b57..0528982fce1b5 100644 --- a/docs/streams/developer-guide/config-streams.md +++ b/docs/streams/developer-guide/config-streams.md @@ -47,7 +47,7 @@ Kafka and Kafka Streams configuration options must be configured before using St # Configuration parameter reference -This section contains the most common Streams configuration parameters. For a full reference, see the [Streams](/43/javadoc/org/apache/kafka/streams/StreamsConfig.html) Javadocs. +This section contains the most common Streams configuration parameters. For a full reference, see the [Streams](/{version}/javadoc/org/apache/kafka/streams/StreamsConfig.html) Javadocs. * Required configuration parameters * application.id @@ -300,7 +300,7 @@ The minimum number of in-sync replicas available for replication if the producer ## Optional configuration parameters -Here are the optional [Streams](/43/javadoc/org/apache/kafka/streams/StreamsConfig.html) javadocs, sorted by level of importance: +Here are the optional [Streams](/{version}/javadoc/org/apache/kafka/streams/StreamsConfig.html) javadocs, sorted by level of importance: > * High: These are parameters with a default value which is most likely not a good fit for production use. It's highly recommended to revisit these parameters for production usage. > * Medium: The default values of these parameters should work for production for many cases, but it's not uncommon that they are changed, for example to tune performance. @@ -1190,8 +1190,8 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter > The deserialization exception handler allows you to manage record exceptions that fail to deserialize. This can be caused by corrupt data, incorrect serialization logic, or unhandled record types. The implemented exception handler needs to return a `FAIL` or `CONTINUE` depending on the record and the exception thrown. Returning `FAIL` will signal that Streams should shut down and `CONTINUE` will signal that Streams should ignore the issue and continue processing. The following library built-in exception handlers are available: > -> * [LogAndContinueExceptionHandler](/43/javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html): This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to deserialize. -> * [LogAndFailExceptionHandler](/43/javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html). This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records. +> * [LogAndContinueExceptionHandler](/{version}/javadoc/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.html): This handler logs the deserialization exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to deserialize. +> * [LogAndFailExceptionHandler](/{version}/javadoc/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.html). This handler logs the deserialization exception and then signals the processing pipeline to stop processing more records. > > @@ -1226,7 +1226,7 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter ### production.exception.handler (deprecated: default.production.exception.handler) -> The production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the [DefaultProductionExceptionHandler](/43/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html) that always fails when these exceptions occur. +> The production exception handler allows you to manage exceptions triggered when trying to interact with a broker such as attempting to produce a record that is too large. By default, Kafka provides and uses the [DefaultProductionExceptionHandler](/{version}/javadoc/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html) that always fails when these exceptions occur. > > An exception handler can return `FAIL`, `CONTINUE`, or `RETRY` depending on the record and the exception thrown. Returning `FAIL` will signal that Streams should shut down. `CONTINUE` will signal that Streams should ignore the issue and continue processing. For `RetriableException` the handler may return `RETRY` to tell the runtime to retry sending the failed record (**Note:** If `RETRY` is returned for a non-`RetriableException` it will be treated as `FAIL`.) If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following: > @@ -1260,9 +1260,9 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter ### default.timestamp.extractor -> A timestamp extractor pulls a timestamp from an instance of [ConsumerRecord](/43/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html). Timestamps are used to control the progress of streams. +> A timestamp extractor pulls a timestamp from an instance of [ConsumerRecord](/{version}/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html). Timestamps are used to control the progress of streams. > -> The default extractor is [FailOnInvalidTimestamp](/43/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html). This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client since [Kafka version 0.10](https://cwiki.apache.org/confluence/x/eaSnAw). Depending on the setting of Kafka's server-side `log.message.timestamp.type` broker and `message.timestamp.type` topic parameters, this extractor provides you with: +> The default extractor is [FailOnInvalidTimestamp](/{version}/javadoc/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.html). This extractor retrieves built-in timestamps that are automatically embedded into Kafka messages by the Kafka producer client since [Kafka version 0.10](https://cwiki.apache.org/confluence/x/eaSnAw). Depending on the setting of Kafka's server-side `log.message.timestamp.type` broker and `message.timestamp.type` topic parameters, this extractor provides you with: > > * **event-time** processing semantics if `log.message.timestamp.type` is set to `CreateTime` aka "producer time" (which is the default). This represents the time when a Kafka producer sent the original message. If you use Kafka's official producer client, the timestamp represents milliseconds since the epoch. > * **ingestion-time** processing semantics if `log.message.timestamp.type` is set to `LogAppendTime` aka "broker time". This represents the time when the Kafka broker received the original message, in milliseconds since the epoch. @@ -1273,12 +1273,12 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter > > If you have data with invalid timestamps and want to process it, then there are two alternative extractors available. Both work on built-in timestamps, but handle invalid timestamps differently. > -> * [LogAndSkipOnInvalidTimestamp](/43/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html): This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but silently drop the record. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an invalid built-in timestamp in your input data. -> * [UsePartitionTimeOnInvalidTimestamp](/43/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html). This extractor returns the record's built-in timestamp if it is valid (i.e. not negative). If the record does not have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it throws an exception. +> * [LogAndSkipOnInvalidTimestamp](/{version}/javadoc/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.html): This extractor logs a warn message and returns the invalid timestamp to Kafka Streams, which will not process but silently drop the record. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records with an invalid built-in timestamp in your input data. +> * [UsePartitionTimeOnInvalidTimestamp](/{version}/javadoc/org/apache/kafka/streams/processor/UsePartitionTimeOnInvalidTimestamp.html). This extractor returns the record's built-in timestamp if it is valid (i.e. not negative). If the record does not have a valid built-in timestamps, the extractor returns the previously extracted valid timestamp from a record of the same topic partition as the current record as a timestamp estimation. In case that no timestamp can be estimated, it throws an exception. > > -> Another built-in extractor is [WallclockTimestampExtractor](/43/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html). This extractor does not actually "extract" a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock (think: `System.currentTimeMillis()`), which effectively means Streams will operate on the basis of the so-called **processing-time** of events. +> Another built-in extractor is [WallclockTimestampExtractor](/{version}/javadoc/org/apache/kafka/streams/processor/WallclockTimestampExtractor.html). This extractor does not actually "extract" a timestamp from the consumed record but rather returns the current time in milliseconds from the system clock (think: `System.currentTimeMillis()`), which effectively means Streams will operate on the basis of the so-called **processing-time** of events. > > You can also provide your own timestamp extractors, for instance to retrieve timestamps embedded in the payload of messages. If you cannot extract a valid timestamp, you can either throw an exception, return a negative timestamp, or estimate a timestamp. Returning a negative timestamp will result in data loss - the corresponding record will not be processed but silently dropped. If you want to estimate a new timestamp, you can use the value provided via `previousTimestamp` (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom `TimestampExtractor` implementation: > @@ -1450,8 +1450,8 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter > > The following library built-in exception handlers are available: > -> * [LogAndContinueProcessingExceptionHandler](/43/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html): This handler logs the processing exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to be processed. -> * [LogAndFailProcessingExceptionHandler](/43/javadoc/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.html). This handler logs the processing exception and then signals the processing pipeline to stop processing more records. +> * [LogAndContinueProcessingExceptionHandler](/{version}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html): This handler logs the processing exception and then signals the processing pipeline to continue processing more records. This log-and-skip strategy allows Kafka Streams to make progress instead of failing if there are records that fail to be processed. +> * [LogAndFailProcessingExceptionHandler](/{version}/javadoc/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.html). This handler logs the processing exception and then signals the processing pipeline to stop processing more records. > > @@ -1506,7 +1506,7 @@ Serde for the inner class of a windowed record. Must implement the `Serde` inter ### rocksdb.config.setter -> The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, you can implement `RocksDBConfigSetter` and provide your custom class via [rocksdb.config.setter](/43/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html). +> The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default configuration for RocksDB, you can implement `RocksDBConfigSetter` and provide your custom class via [rocksdb.config.setter](/{version}/javadoc/org/apache/kafka/streams/state/RocksDBConfigSetter.html). > > Here is an example that adjusts the memory size consumed by RocksDB. > @@ -1574,9 +1574,9 @@ Note that you need to do two things to enable optimizations. In addition to sett ### Kafka consumers, producer and admin client configuration parameters - You can specify parameters for the Kafka [consumers](/43/javadoc/org/apache/kafka/clients/consumer/package-summary.html), [producers](/43/javadoc/org/apache/kafka/clients/producer/package-summary.html), and [admin client](/43/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html) that are used internally. The consumer, producer and admin client settings are defined by specifying parameters in a `StreamsConfig` instance. + You can specify parameters for the Kafka [consumers](/{version}/javadoc/org/apache/kafka/clients/consumer/package-summary.html), [producers](/{version}/javadoc/org/apache/kafka/clients/producer/package-summary.html), and [admin client](/{version}/javadoc/org/apache/kafka/kafka/clients/admin/package-summary.html) that are used internally. The consumer, producer and admin client settings are defined by specifying parameters in a `StreamsConfig` instance. - In this example, the Kafka [consumer session timeout](/43/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG) is configured to be 60000 milliseconds in the Streams settings: + In this example, the Kafka [consumer session timeout](/{version}/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#SESSION_TIMEOUT_MS_CONFIG) is configured to be 60000 milliseconds in the Streams settings: Properties streamsSettings = new Properties(); diff --git a/docs/streams/developer-guide/dsl-api.md b/docs/streams/developer-guide/dsl-api.md index c12ef438ad2dc..ff09a9c1e87e0 100644 --- a/docs/streams/developer-guide/dsl-api.md +++ b/docs/streams/developer-guide/dsl-api.md @@ -75,7 +75,7 @@ With the DSL, you can define [processor topologies](../core-concepts.html#stream After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into action). A step-by-step guide for writing a stream processing application using the DSL is provided below. -For a complete list of available API functionality, see also the [Streams](/43/javadoc/org/apache/kafka/streams/package-summary.html) API docs. +For a complete list of available API functionality, see also the [Streams](/{version}/javadoc/org/apache/kafka/streams/package-summary.html) API docs. ### KStream @@ -167,7 +167,7 @@ Description -Creates a KStream from the specified Kafka input topics and interprets the data as a record stream. A `KStream` represents a _partitioned_ record stream. [(details)](/43/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream\(java.lang.String\)) +Creates a KStream from the specified Kafka input topics and interprets the data as a record stream. A `KStream` represents a _partitioned_ record stream. [(details)](/{version}/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream\(java.lang.String\)) In the case of a KStream, the local KStream instance of every application instance will be populated with data from only **a subset** of the partitions of the input topic. Collectively, across all application instances, all input topic partitions are read and processed. @@ -208,7 +208,7 @@ Several variants of `stream` exist. For example, you can specify a regex pattern -Reads the specified Kafka input topic into a KTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not `null`) or as DELETE (when the value is `null`) for that key. [(details)](/43/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String\(java.lang.String\)) +Reads the specified Kafka input topic into a KTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not `null`) or as DELETE (when the value is `null`) for that key. [(details)](/{version}/javadoc/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String\(java.lang.String\)) In the case of a KTable, the local KTable instance of every application instance will be populated with data from only **a subset** of the partitions of the input topic. Collectively, across all application instances, all input topic partitions are read and processed. @@ -237,7 +237,7 @@ Several variants of `table` exist, for example to specify the `auto.offset.reset -Reads the specified Kafka input topic into a GlobalKTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not `null`) or as DELETE (when the value is `null`) for that key. [(details)](/43/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String\(java.lang.String\)) +Reads the specified Kafka input topic into a GlobalKTable. The topic is interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE (when the record value is not `null`) or as DELETE (when the value is `null`) for that key. [(details)](/{version}/javadoc/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String\(java.lang.String\)) In the case of a GlobalKTable, the local GlobalKTable instance of every application instance will be populated with data from **all** the partitions of the input topic. @@ -311,7 +311,7 @@ Description -Branch (or split) a `KStream` based on the supplied predicates into one or more `KStream` instances. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#split\(\))) +Branch (or split) a `KStream` based on the supplied predicates into one or more `KStream` instances. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split\(\))) Predicates are evaluated in order. A record is placed to one and only one output stream on the first match: if the n-th predicate evaluates to true, the record is placed to n-th stream. If a record does not match any predicates, it will be routed to the default branch, or dropped if no default branch is created. @@ -392,7 +392,7 @@ A multicast is implemented as a broadcast plus filters. -Evaluates a boolean function for each element and retains those for which the function returns true. ([KStream details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter-org.apache.kafka.streams.kstream.Predicate-), [KTable details](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#filter-org.apache.kafka.streams.kstream.Predicate-)) +Evaluates a boolean function for each element and retains those for which the function returns true. ([KStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter-org.apache.kafka.streams.kstream.Predicate-), [KTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#filter-org.apache.kafka.streams.kstream.Predicate-)) KStream stream = ...; @@ -419,7 +419,7 @@ Evaluates a boolean function for each element and retains those for which the fu -Evaluates a boolean function for each element and drops those for which the function returns true. ([KStream details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#filterNot-org.apache.kafka.streams.kstream.Predicate-), [KTable details](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#filterNot-org.apache.kafka.streams.kstream.Predicate-)) +Evaluates a boolean function for each element and drops those for which the function returns true. ([KStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#filterNot-org.apache.kafka.streams.kstream.Predicate-), [KTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#filterNot-org.apache.kafka.streams.kstream.Predicate-)) KStream stream = ...; @@ -446,7 +446,7 @@ Evaluates a boolean function for each element and drops those for which the func -Takes one record and produces zero, one, or more records. You can modify the record keys and values, including their types. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#flatMap-org.apache.kafka.streams.kstream.KeyValueMapper-)) +Takes one record and produces zero, one, or more records. You can modify the record keys and values, including their types. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#flatMap-org.apache.kafka.streams.kstream.KeyValueMapper-)) **Marks the stream for data re-partitioning:** Applying a grouping or a join after `flatMap` will result in re-partitioning of the records. If possible use `flatMapValues` instead, which will not cause data re-partitioning. @@ -482,7 +482,7 @@ Takes one record and produces zero, one, or more records. You can modify the rec -Takes one record and produces zero, one, or more records, while retaining the key of the original record. You can modify the record values and the value type. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#flatMapValues-org.apache.kafka.streams.kstream.ValueMapper-)) +Takes one record and produces zero, one, or more records, while retaining the key of the original record. You can modify the record values and the value type. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#flatMapValues-org.apache.kafka.streams.kstream.ValueMapper-)) `flatMapValues` is preferable to `flatMap` because it will not cause data re-partitioning. However, you cannot modify the key or key type like `flatMap` does. @@ -511,7 +511,7 @@ Takes one record and produces zero, one, or more records, while retaining the ke -**Terminal operation.** Performs a stateless action on each record. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-)) +**Terminal operation.** Performs a stateless action on each record. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-)) You would use `foreach` to cause _side effects_ based on the input data (similar to `peek`) and then _stop_ _further processing_ of the input data (unlike `peek`, which is not a terminal operation). @@ -541,7 +541,7 @@ You would use `foreach` to cause _side effects_ based on the input data (similar -Groups the records by the existing key. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey--)) +Groups the records by the existing key. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupByKey--)) Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned ("keyed") for subsequent operations. @@ -586,7 +586,7 @@ Grouping is a prerequisite for aggregating a stream or a table and ensures that -Groups the records by a _new_ key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. `groupBy` is a shorthand for `selectKey(...).groupByKey()`. ([KStream details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-), [KTable details](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-)) +Groups the records by a _new_ key, which may be of a different key type. When grouping a table, you may also specify a new value and value type. `groupBy` is a shorthand for `selectKey(...).groupByKey()`. ([KStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-), [KTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-)) Grouping is a prerequisite for aggregating a stream or a table and ensures that data is properly partitioned ("keyed") for subsequent operations. @@ -638,7 +638,7 @@ Grouping is a prerequisite for aggregating a stream or a table and ensures that -Cogrouping allows to aggregate multiple input streams in a single operation. The different (already grouped) input streams must have the same key type and may have different values types. [KGroupedStream#cogroup()](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#cogroup) creates a new cogrouped stream with a single input stream, while [CogroupedKStream#cogroup()](/43/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#cogroup) adds a grouped stream to an existing cogrouped stream. A `CogroupedKStream` may be [windowed](/43/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#windowedBy) before it is [aggregated](/43/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#aggregate). +Cogrouping allows to aggregate multiple input streams in a single operation. The different (already grouped) input streams must have the same key type and may have different values types. [KGroupedStream#cogroup()](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#cogroup) creates a new cogrouped stream with a single input stream, while [CogroupedKStream#cogroup()](/{version}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#cogroup) adds a grouped stream to an existing cogrouped stream. A `CogroupedKStream` may be [windowed](/{version}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#windowedBy) before it is [aggregated](/{version}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#aggregate). Cogroup does not cause a repartition as it has the prerequisite that the input streams are grouped. In the process of creating these groups they will have already been repartitioned if the stream was already marked for repartitioning. @@ -673,7 +673,7 @@ Cogroup does not cause a repartition as it has the prerequisite that the input s -Takes one record and produces one record. You can modify the record key and value, including their types. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-)) +Takes one record and produces one record. You can modify the record key and value, including their types. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-)) **Marks the stream for data re-partitioning:** Applying a grouping or a join after `map` will result in re-partitioning of the records. If possible use `mapValues` instead, which will not cause data re-partitioning. @@ -704,7 +704,7 @@ Takes one record and produces one record. You can modify the record key and valu -Takes one record and produces one record, while retaining the key of the original record. You can modify the record value and the value type. ([KStream details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-), [KTable details](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-)) +Takes one record and produces one record, while retaining the key of the original record. You can modify the record value and the value type. ([KStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-), [KTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-)) `mapValues` is preferable to `map` because it will not cause data re-partitioning. However, it does not allow you to modify the key or key type like `map` does. @@ -731,7 +731,7 @@ Takes one record and produces one record, while retaining the key of the origina -Merges records of two streams into one larger stream. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-)) +Merges records of two streams into one larger stream. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#merge-org.apache.kafka.streams.kstream.KStream-)) There is no ordering guarantee between records from different streams in the merged stream. Relative order is preserved within each input stream though (ie, records within the same input stream are processed in order) @@ -759,7 +759,7 @@ There is no ordering guarantee between records from different streams in the mer -Performs a stateless action on each record, and returns an unchanged stream. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#peek-org.apache.kafka.streams.kstream.ForeachAction-)) +Performs a stateless action on each record, and returns an unchanged stream. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#peek-org.apache.kafka.streams.kstream.ForeachAction-)) You would use `peek` to cause _side effects_ based on the input data (similar to `foreach`) and _continue_ _processing_ the input data (unlike `foreach`, which is a terminal operation). `peek` returns the input stream as-is; if you need to modify the input stream, use `map` or `mapValues` instead. @@ -791,7 +791,7 @@ You would use `peek` to cause _side effects_ based on the input data (similar to -**Terminal operation.** Prints the records to `System.out`. See Javadocs for serde and `toString()` caveats. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#print--)) +**Terminal operation.** Prints the records to `System.out`. See Javadocs for serde and `toString()` caveats. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#print--)) Calling `print()` is the same as calling `foreach((key, value) -> System.out.println(key + ", " + value))` @@ -822,7 +822,7 @@ Calling `print()` is the same as calling `foreach((key, value) -> System.out.pri -Assigns a new key - possibly of a new key type - to each record. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#selectKey-org.apache.kafka.streams.kstream.KeyValueMapper-)) +Assigns a new key - possibly of a new key type - to each record. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#selectKey-org.apache.kafka.streams.kstream.KeyValueMapper-)) Calling `selectKey(mapper)` is the same as calling `map((key, value) -> mapper(key, value), value)`. @@ -852,7 +852,7 @@ Calling `selectKey(mapper)` is the same as calling `map((key, value) -> mapper(k -Get the changelog stream of this table. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#toStream--)) +Get the changelog stream of this table. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#toStream--)) KTable table = ...; @@ -878,7 +878,7 @@ Get the changelog stream of this table. ([details](/43/javadoc/org/apache/kafka/ -Convert an event stream into a table, or say a changelog stream. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#toTable--)) +Convert an event stream into a table, or say a changelog stream. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#toTable--)) KStream stream = ...; @@ -902,7 +902,7 @@ Convert an event stream into a table, or say a changelog stream. ([details](/43/ -Manually trigger repartitioning of the stream with desired number of partitions. ([details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition--)) +Manually trigger repartitioning of the stream with desired number of partitions. ([details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#repartition--)) Kafka Streams will manage the topic for `repartition()`. Generated topic is treated as internal topic, as a result data will be purged automatically as any other internal repartition topic. In addition, you can specify the desired number of partitions, which allows to easily scale in/out downstream sub-topologies. `repartition()` operation always triggers repartitioning of the stream, as a result it can be used with embedded Processor API methods (like `process()` et al.) that do not trigger auto repartitioning when key changing operation is performed beforehand. @@ -919,9 +919,9 @@ Stateful transformations depend on state for processing inputs and producing out **Note:** Following store types are used regardless of the possibly specified type (via the parameter `materialized`): - * non-windowed aggregations and non-windowed KTables use [TimestampedKeyValueStore](/43/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)s or [VersionedKeyValueStore](/43/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html)s, depending on whether the parameter `materialized` is versioned - * time-windowed aggregations and KStream-KStream joins use [TimestampedWindowStore](/43/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)s - * session windowed aggregations use [SessionStore](/43/javadoc/org/apache/kafka/streams/state/SessionStore.html)s (there is no timestamped session store as of now) + * non-windowed aggregations and non-windowed KTables use [TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)s or [VersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html)s, depending on whether the parameter `materialized` is versioned + * time-windowed aggregations and KStream-KStream joins use [TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)s + * session windowed aggregations use [SessionStore](/{version}/javadoc/org/apache/kafka/streams/state/SessionStore.html)s (there is no timestamped session store as of now) @@ -996,7 +996,7 @@ Description -**Rolling aggregation.** Aggregates the values of (non-windowed) records by the grouped key or cogrouped. Aggregating is a generalization of `reduce` and allows, for example, the aggregate value to have a different type than the input values. ([KGroupedStream details](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html), [KGroupedTable details](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html) [KGroupedTable details](/43/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html)) +**Rolling aggregation.** Aggregates the values of (non-windowed) records by the grouped key or cogrouped. Aggregating is a generalization of `reduce` and allows, for example, the aggregate value to have a different type than the input values. ([KGroupedStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html), [KGroupedTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html) [KGroupedTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html)) When aggregating a _grouped stream_ , you must provide an initializer (e.g., `aggValue = 0`) and an "adder" aggregator (e.g., `aggValue + curValue`). When aggregating a _grouped table_ , you must additionally provide a "subtractor" aggregator (think: `aggValue - oldValue`). @@ -1061,7 +1061,7 @@ See the example at the bottom of this section for a visualization of the aggrega -**Windowed aggregation.** Aggregates the values of records, per window, by the grouped key. Aggregating is a generalization of `reduce` and allows, for example, the aggregate value to have a different type than the input values. ([TimeWindowedKStream details](/43/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html), [SessionWindowedKStream details](/43/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html)) +**Windowed aggregation.** Aggregates the values of records, per window, by the grouped key. Aggregating is a generalization of `reduce` and allows, for example, the aggregate value to have a different type than the input values. ([TimeWindowedKStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html), [SessionWindowedKStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html)) You must provide an initializer (e.g., `aggValue = 0`), "adder" aggregator (e.g., `aggValue + curValue`), and a window. When windowing based on sessions, you must additionally provide a "session merger" aggregator (e.g., `mergedAggValue = leftAggValue + rightAggValue`). @@ -1129,7 +1129,7 @@ See the example at the bottom of this section for a visualization of the aggrega -**Rolling aggregation.** Counts the number of records by the grouped key. ([KGroupedStream details](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html), [KGroupedTable details](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html)) +**Rolling aggregation.** Counts the number of records by the grouped key. ([KGroupedStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html), [KGroupedTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html)) Several variants of `count` exist, see Javadocs for details. @@ -1170,7 +1170,7 @@ Detailed behavior for `KGroupedTable`: -**Windowed aggregation.** Counts the number of records, per window, by the grouped key. ([TimeWindowedKStream details](/43/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html), [SessionWindowedKStream details](/43/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html)) +**Windowed aggregation.** Counts the number of records, per window, by the grouped key. ([TimeWindowedKStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html), [SessionWindowedKStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html)) The windowed `count` turns a `TimeWindowedKStream` or `SessionWindowedKStream` into a windowed `KTable, V>`. @@ -1217,7 +1217,7 @@ Detailed behavior: -**Rolling aggregation.** Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike `aggregate`. ([KGroupedStream details](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html), [KGroupedTable details](/43/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html)) +**Rolling aggregation.** Combines the values of (non-windowed) records by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. The result value type cannot be changed, unlike `aggregate`. ([KGroupedStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html), [KGroupedTable details](/{version}/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html)) When reducing a _grouped stream_ , you must provide an "adder" reducer (e.g., `aggValue + curValue`). When reducing a _grouped table_ , you must additionally provide a "subtractor" reducer (e.g., `aggValue - oldValue`). @@ -1274,7 +1274,7 @@ See the example at the bottom of this section for a visualization of the aggrega -**Windowed aggregation.** Combines the values of records, per window, by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. Records with `null` key or value are ignored. The result value type cannot be changed, unlike `aggregate`. ([TimeWindowedKStream details](/43/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html), [SessionWindowedKStream details](/43/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html)) +**Windowed aggregation.** Combines the values of records, per window, by the grouped key. The current record value is combined with the last reduced value, and a new reduced value is returned. Records with `null` key or value are ignored. The result value type cannot be changed, unlike `aggregate`. ([TimeWindowedKStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/TimeWindowedKStream.html), [SessionWindowedKStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/SessionWindowedKStream.html)) The windowed `reduce` turns a `TimeWindowedKStream` or a `SessionWindowedKStream` into a windowed `KTable, V>`. @@ -2154,7 +2154,7 @@ Description -Performs an INNER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type `KStream` rather than `KStream, ...>`. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.KStream-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.JoinWindows-) +Performs an INNER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type `KStream` rather than `KStream, ...>`. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.KStream-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.JoinWindows-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -2207,7 +2207,7 @@ See the semantics overview at the bottom of this section for a detailed descript -Performs a LEFT JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type `KStream` rather than `KStream, ...>`. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin-org.apache.kafka.streams.kstream.KStream-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.JoinWindows-) +Performs a LEFT JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type `KStream` rather than `KStream, ...>`. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin-org.apache.kafka.streams.kstream.KStream-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.JoinWindows-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -2262,7 +2262,7 @@ See the semantics overview at the bottom of this section for a detailed descript -Performs an OUTER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type `KStream` rather than `KStream, ...>`. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#outerJoin-org.apache.kafka.streams.kstream.KStream-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.JoinWindows-) +Performs an OUTER JOIN of this stream with another stream. Even though this operation is windowed, the joined stream will be of type `KStream` rather than `KStream, ...>`. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#outerJoin-org.apache.kafka.streams.kstream.KStream-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.JoinWindows-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -2307,7 +2307,7 @@ See the semantics overview at the bottom of this section for a detailed descript **Note:** If you use the old and now deprecated API to specify the grace period, i.e., `JoinWindows.of(...).grace(...)`, left/outer join results are emitted eagerly, and the observed result might differ from the result shown below. -The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied [ValueJoiner](/43/javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html) for the `join`, `leftJoin`, and `outerJoin` methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the `ValueJoiner` is not called at all. +The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied [ValueJoiner](/{version}/javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html) for the `join`, `leftJoin`, and `outerJoin` methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the `ValueJoiner` is not called at all. @@ -2952,7 +2952,7 @@ Description -Performs an INNER JOIN of this table with another table. The result is an ever-updating KTable that represents the "current" result of the join. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#join-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs an INNER JOIN of this table with another table. The result is an ever-updating KTable that represents the "current" result of the join. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#join-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -2997,7 +2997,7 @@ See the semantics overview at the bottom of this section for a detailed descript -Performs a LEFT JOIN of this table with another table. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#leftJoin-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs a LEFT JOIN of this table with another table. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#leftJoin-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -3044,7 +3044,7 @@ See the semantics overview at the bottom of this section for a detailed descript -Performs an OUTER JOIN of this table with another table. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KTable.html#outerJoin-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs an OUTER JOIN of this table with another table. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KTable.html#outerJoin-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -3077,7 +3077,7 @@ See the semantics overview at the bottom of this section for a detailed descript
-**Semantics of table-table equi-joins:** The semantics of the various table-table equi-join variants are explained below. To improve the readability of the table, you can assume that (1) all records have the same key (and thus the key in the table is omitted) and that (2) all records are processed in timestamp order. The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied [ValueJoiner](/43/javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html) for the `join`, `leftJoin`, and `outerJoin` methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the `ValueJoiner` is not called at all. +**Semantics of table-table equi-joins:** The semantics of the various table-table equi-join variants are explained below. To improve the readability of the table, you can assume that (1) all records have the same key (and thus the key in the table is omitted) and that (2) all records are processed in timestamp order. The columns INNER JOIN, LEFT JOIN, and OUTER JOIN denote what is passed as arguments to the user-supplied [ValueJoiner](/{version}/javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html) for the `join`, `leftJoin`, and `outerJoin` methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the `ValueJoiner` is not called at all. @@ -3875,7 +3875,7 @@ Description -Performs an INNER JOIN of this stream with the table, effectively doing a table lookup. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs an INNER JOIN of this stream with the table, effectively doing a table lookup. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -3929,7 +3929,7 @@ See the semantics overview at the bottom of this section for a detailed descript -Performs a LEFT JOIN of this stream with the table, effectively doing a table lookup. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs a LEFT JOIN of this stream with the table, effectively doing a table lookup. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-) **Data must be co-partitioned** : The input data for both sides must be co-partitioned. @@ -3971,7 +3971,7 @@ See the semantics overview at the bottom of this section for a detailed descript
-**Semantics of stream-table joins:** The semantics of the various stream-table join variants are explained below. To improve the readability of the table we assume that (1) all records have the same key (and thus we omit the key in the table) and that (2) all records are processed in timestamp order. The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied [ValueJoiner](/43/javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html) for the `join` and `leftJoin` methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the `ValueJoiner` is not called at all. +**Semantics of stream-table joins:** The semantics of the various stream-table join variants are explained below. To improve the readability of the table we assume that (1) all records have the same key (and thus we omit the key in the table) and that (2) all records are processed in timestamp order. The columns INNER JOIN and LEFT JOIN denote what is passed as arguments to the user-supplied [ValueJoiner](/{version}/javadoc/org/apache/kafka/streams/kstream/ValueJoiner.html) for the `join` and `leftJoin` methods, respectively, whenever a new input record is received on either side of the join. An empty table cell denotes that the `ValueJoiner` is not called at all. @@ -4362,7 +4362,7 @@ Description -Performs an INNER JOIN of this stream with the global table, effectively doing a table lookup. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.GlobalKTable-org.apache.kafka.streams.kstream.KeyValueMapper-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs an INNER JOIN of this stream with the global table, effectively doing a table lookup. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.GlobalKTable-org.apache.kafka.streams.kstream.KeyValueMapper-org.apache.kafka.streams.kstream.ValueJoiner-) The `GlobalKTable` is fully bootstrapped upon (re)start of a `KafkaStreams` instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed. @@ -4406,7 +4406,7 @@ Detailed behavior: -Performs a LEFT JOIN of this stream with the global table, effectively doing a table lookup. [(details)](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin-org.apache.kafka.streams.kstream.GlobalKTable-org.apache.kafka.streams.kstream.KeyValueMapper-org.apache.kafka.streams.kstream.ValueJoiner-) +Performs a LEFT JOIN of this stream with the global table, effectively doing a table lookup. [(details)](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#leftJoin-org.apache.kafka.streams.kstream.GlobalKTable-org.apache.kafka.streams.kstream.KeyValueMapper-org.apache.kafka.streams.kstream.ValueJoiner-) The `GlobalKTable` is fully bootstrapped upon (re)start of a `KafkaStreams` instance, which means the table is fully populated with all the data in the underlying topic that is available at the time of the startup. The actual data processing begins only once the bootstrapping has completed. @@ -5693,7 +5693,7 @@ Description -**Terminal operation.** Write the records to Kafka topic(s). ([KStream details](/43/javadoc/org/apache/kafka/streams/kstream/KStream.html#to\(java.lang.String\))) +**Terminal operation.** Write the records to Kafka topic(s). ([KStream details](/{version}/javadoc/org/apache/kafka/streams/kstream/KStream.html#to\(java.lang.String\))) When to provide serdes explicitly: diff --git a/docs/streams/developer-guide/interactive-queries.md b/docs/streams/developer-guide/interactive-queries.md index eb228432056f4..48ec2af84e063 100644 --- a/docs/streams/developer-guide/interactive-queries.md +++ b/docs/streams/developer-guide/interactive-queries.md @@ -384,7 +384,7 @@ There are many ways to add an RPC layer. The only requirements are that the RPC ## Exposing the RPC endpoints of your application -To enable remote state store discovery in a distributed Kafka Streams application, you must set the [configuration property](config-streams.html#streams-developer-guide-required-configs) in the config properties. The `application.server` property defines a unique `host:port` pair that points to the RPC endpoint of the respective instance of a Kafka Streams application. The value of this configuration property will vary across the instances of your application. When this property is set, Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of [StreamsMetadata](/43/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html). +To enable remote state store discovery in a distributed Kafka Streams application, you must set the [configuration property](config-streams.html#streams-developer-guide-required-configs) in the config properties. The `application.server` property defines a unique `host:port` pair that points to the RPC endpoint of the respective instance of a Kafka Streams application. The value of this configuration property will vary across the instances of your application. When this property is set, Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of [StreamsMetadata](/{version}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html). **Tip** @@ -429,7 +429,7 @@ This example shows how to configure and run a Kafka Streams application that sup ## Discovering and accessing application instances and their local state stores -The following methods return [StreamsMetadata](/43/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html) objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores. +The following methods return [StreamsMetadata](/{version}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html) objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores. * `KafkaStreams#allMetadata()`: find all instances of this application * `KafkaStreams#allMetadataForStore(String storeName)`: find those applications instances that manage local instances of the state store "storeName" @@ -440,7 +440,7 @@ The following methods return [StreamsMetadata](/43/javadoc/org/apache/kafka/stre Attention -If `application.server` is not configured for an application instance, then the above methods will not find any [StreamsMetadata](/43/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html) for it. +If `application.server` is not configured for an application instance, then the above methods will not find any [StreamsMetadata](/{version}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html) for it. For example, we can now find the `StreamsMetadata` for the state store named "word-count" that we defined in the code example shown in the previous section: diff --git a/docs/streams/developer-guide/processor-api.md b/docs/streams/developer-guide/processor-api.md index 4273394884c3c..c67c42fcb3a15 100644 --- a/docs/streams/developer-guide/processor-api.md +++ b/docs/streams/developer-guide/processor-api.md @@ -39,7 +39,7 @@ The Processor API can be used to implement both **stateless** as well as **state **Combining the DSL and the Processor API:** You can combine the convenience of the DSL with the power and flexibility of the Processor API as described in the section [Applying processors (Processor API integration)](dsl-api.html#streams-developer-guide-dsl-process). -For a complete list of available API functionality, see the [Streams](/43/javadoc/org/apache/kafka/streams/package-summary.html) API docs. +For a complete list of available API functionality, see the [Streams](/{version}/javadoc/org/apache/kafka/streams/package-summary.html) API docs. # Defining a Stream Processor @@ -173,11 +173,11 @@ Yes (enabled by default) * Stores its data on local disk. * Storage capacity: managed local state can be larger than the memory (heap space) of an application instance, but must fit into the available local disk space. * RocksDB settings can be fine-tuned, see [RocksDB configuration](config-streams.html#streams-developer-guide-rocksdb-config). - * Available [store variants](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)): timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store. - * Use [persistentTimestampedKeyValueStore](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\)) when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries. - * Use [persistentVersionedKeyValueStore](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations. - * Use [persistentWindowStore](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) or [persistentTimestampedWindowStore](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively. - * Use [persistentSessionStore](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\)) when you need a persistent sessionWindowedKey-value store. + * Available [store variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)): timestamped key-value store, versioned key-value store, time window key-value store, session window key-value store. + * Use [persistentTimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\)) when you need a persistent key-(value/timestamp) store that supports put/get/delete and range queries. + * Use [persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) when you need a persistent, versioned key-(value/timestamp) store that supports put/get/delete and timestamped get operations. + * Use [persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) or [persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\)) when you need a persistent timeWindowedKey-value or timeWindowedKey-(value/timestamp) store, respectively. + * Use [persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\)) when you need a persistent sessionWindowedKey-value store. @@ -217,9 +217,9 @@ Yes (enabled by default) * Stores its data in memory. * Storage capacity: managed local state must fit into memory (heap space) of an application instance. * Useful when application instances run in an environment where local disk space is either not available or local disk space is wiped in-between app instance restarts. - * Available [store variants](/43/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-): time window key-value store, session window key-value store. - * Use [TimestampedKeyValueStore](/43/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html) when you need a key-(value/timestamp) store that supports put/get/delete and range queries. - * Use [TimestampedWindowStore](/43/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html) when you need to store windowedKey-(value/timestamp) pairs. + * Available [store variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-): time window key-value store, session window key-value store. + * Use [TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html) when you need a key-(value/timestamp) store that supports put/get/delete and range queries. + * Use [TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html) when you need to store windowedKey-(value/timestamp) pairs. * There is no built-in in-memory, versioned key-value store at this time. @@ -297,7 +297,7 @@ You can query timestamped state stores both with and without a timestamp. * For Processor API users, nothing changes in existing applications, and you have the option of using the timestamped stores. * For DSL operators, store data is upgraded lazily in the background. - * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/43/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write. + * No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you can opt-in by implementing the [TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html) interface. In this case, the old format is retained, and Streams uses a proxy store that removes/adds timestamps on read/write. @@ -305,7 +305,7 @@ You can query timestamped state stores both with and without a timestamp. Versioned key-value state stores are available since Kafka Streams 3.5. Rather than storing a single record version (value and timestamp) per key, versioned state stores may store multiple record versions per key. This allows versioned state stores to support timestamped retrieval operations to return the latest record (per key) as of a specified timestamp. -You can create a persistent, versioned state store by passing a [VersionedBytesStoreSupplier](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) to the [versionedKeyValueStoreBuilder](/43/javadoc/org/apache/kafka/streams/state/Stores.html#versionedKeyValueStoreBuilder\(java.lang.String,java.time.Duration\)), or by implementing your own [VersionedKeyValueStore](/43/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html). +You can create a persistent, versioned state store by passing a [VersionedBytesStoreSupplier](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)) to the [versionedKeyValueStoreBuilder](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#versionedKeyValueStoreBuilder\(java.lang.String,java.time.Duration\)), or by implementing your own [VersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/VersionedKeyValueStore.html). Each versioned store has an associated, fixed-duration _history retention_ parameter which specifies long old record versions should be kept for. In particular, a versioned store guarantees to return accurate results for timestamped retrieval operations where the timestamp being queried is within history retention of the current observed stream time. @@ -317,7 +317,7 @@ Versioned stores do not support caching or interactive queries at this time. Als **Upgrade note:** Versioned state stores are opt-in only; no automatic upgrades from non-versioned to versioned stores will take place. -Upgrades are supported from persistent, non-versioned key-value stores to persistent, versioned key-value stores as long as the original store has the same changelog topic format as the versioned store being upgraded to. Both persistent [key-value stores](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)) and [timestamped key-value stores](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\)) share the same changelog topic format as [persistent versioned key-value stores](/43/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)), and therefore both are eligible for upgrades. +Upgrades are supported from persistent, non-versioned key-value stores to persistent, versioned key-value stores as long as the original store has the same changelog topic format as the versioned store being upgraded to. Both persistent [key-value stores](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)) and [timestamped key-value stores](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\)) share the same changelog topic format as [persistent versioned key-value stores](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\)), and therefore both are eligible for upgrades. If you wish to upgrade an application using persistent, non-versioned key-value stores to use persistent, versioned key-value stores instead, you can perform the following procedure: @@ -338,7 +338,7 @@ A read-only state store materialized the data from its input topic. It also uses You can use the built-in state store types or implement your own. The primary interface to implement for the store is `org.apache.kafka.streams.processor.StateStore`. Kafka Streams also has a few extended interfaces such as `KeyValueStore` and `VersionedKeyValueStore`. -Note that your customized `org.apache.kafka.streams.processor.StateStore` implementation also needs to provide the logic on how to restore the state via the `org.apache.kafka.streams.processor.StateRestoreCallback` or `org.apache.kafka.streams.processor.BatchingStateRestoreCallback` interface. Details on how to instantiate these interfaces can be found in the [javadocs](/43/javadoc/org/apache/kafka/streams/processor/StateStore.html). +Note that your customized `org.apache.kafka.streams.processor.StateStore` implementation also needs to provide the logic on how to restore the state via the `org.apache.kafka.streams.processor.StateRestoreCallback` or `org.apache.kafka.streams.processor.BatchingStateRestoreCallback` interface. Details on how to instantiate these interfaces can be found in the [javadocs](/{version}/javadoc/org/apache/kafka/streams/processor/StateStore.html). You also need to provide a "builder" for the store by implementing the `org.apache.kafka.streams.state.StoreBuilder` interface, which Kafka Streams uses to create instances of your store.