diff --git a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md index acd98b337c7..1067fa72c4e 100644 --- a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md @@ -78,16 +78,20 @@ spec: | `consumeRetryEnabled` | N | Input/Output | Enable consume retry by setting to `"true"`. Default to `false` in Kafka binding component. | `"true"`, `"false"` | | `publishTopic` | Y | Output | The topic to publish to. | `"mytopic"` | | `authRequired` | N | *Deprecated* | Enable [SASL](https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer) authentication with the Kafka brokers. | `"true"`, `"false"` | -| `authType` | Y | Input/Output | Configure or disable authentication. Supported values: `none`, `password`, `mtls`, or `oidc` | `"password"`, `"none"` | +| `authType` | Y | Input/Output | Configure or disable authentication. Supported values: `none`, `password`, `mtls`, `oidc` or `oidc_private_key_jwt` | `"password"`, `"none"` | | `saslUsername` | N | Input/Output | The SASL username used for authentication. Only required if `authRequired` is set to `"true"`. | `"adminuser"` | | `saslPassword` | N | Input/Output | The SASL password used for authentication. Can be `secretKeyRef` to use a [secret reference]({{% ref component-secrets.md %}}). Only required if `authRequired` is set to `"true"`. | `""`, `"KeFg23!"` | | `saslMechanism` | N | Input/Output | The SASL authentication mechanism you'd like to use. Only required if `authtype` is set to `"password"`. If not provided, defaults to `PLAINTEXT`, which could cause a break for some services, like Amazon Managed Service for Kafka. | `"SHA-512", "SHA-256", "PLAINTEXT"` | | `initialOffset` | N | Input | The initial offset to use if no offset was previously committed. Should be "newest" or "oldest". Defaults to "newest". | `"oldest"` | | `maxMessageBytes` | N | Input/Output | The maximum size in bytes allowed for a single Kafka message. Defaults to 1024. | `"2048"` | -| `oidcTokenEndpoint` | N | Input/Output | Full URL to an OAuth2 identity provider access token endpoint. Required when `authType` is set to `oidc` | "https://identity.example.com/v1/token" | -| `oidcClientID` | N | Input/Output | The OAuth2 client ID that has been provisioned in the identity provider. Required when `authType` is set to `oidc` | `"dapr-kafka"` | +| `oidcTokenEndpoint` | N | Input/Output | Full URL to an OAuth2 identity provider access token endpoint. Required when `authType` is set to `oidc` or `oidc_private_key_jwt` | "https://identity.example.com/v1/token" | +| `oidcClientID` | N | Input/Output | The OAuth2 client ID that has been provisioned in the identity provider. Required when `authType` is set to `oidc` or `oidc_private_key_jwt` | `"dapr-kafka"` | | `oidcClientSecret` | N | Input/Output | The OAuth2 client secret that has been provisioned in the identity provider: Required when `authType` is set to `oidc` | `"KeFg23!"` | -| `oidcScopes` | N | Input/Output | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc`. Defaults to `"openid"` | `"openid,kafka-prod"` | +| `oidcScopes` | N | Input/Output | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc` or `oidc_private_key_jwt`. Defaults to `"openid"` | `"openid,kafka-prod"` | +| `oidcClientAssertionCert` | N | Input/Output | The OAuth2 client assertion certificate used for authentication. Required when `authType` is set to `oidc_private_key_jwt`. Can be `secretKeyRef` to use a secret reference | `"-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"` | +| `oidcClientAssertionKey` | N | Input/Output | The OAuth2 client assertion key used for authentication. Required when `authType` is set to `oidc_private_key_jwt`. Can be `secretKeyRef` to use a secret reference | `"-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----"` | +| `oidcResource` | N | Input/Output | The OAuth2 resource to request with the access token. Recommended when `authType` is set to `oidc_private_key_jwt`. | `"api://kafka"` | +| `oidcAudience` | N | Input/Output | The OAuth2 audience to request with the access token. Recommended when `authType` is set to `oidc_private_key_jwt`. | `"http:///realms/local"` | | `version` | N | Input/Output | Kafka cluster version. Defaults to 2.0.0. Please note that this needs to be mandatorily set to `1.0.0` for EventHubs with Kafka. | `"1.0.0"` | | `direction` | N | Input/Output | The direction of the binding. | `"input"`, `"output"`, `"input, output"` | | `oidcExtensions` | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` | diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 8e4e95d74ea..d4d59c8e034 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -75,7 +75,7 @@ spec: value: "true" - name: escapeHeaders # Optional. value: false - + ``` > For details on using `secretKeyRef`, see the guide on [how to reference secrets in components]({{% ref component-secrets.md %}}). @@ -89,7 +89,7 @@ spec: | consumerID | N | Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the `consumerID` is not provided, the Dapr runtime set it to the Dapr application ID (`appID`) value. If a value for `consumerGroup` is provided, any value for `consumerID` is ignored - a combination of the consumer group and a random unique identifier will be set for the `consumerID` instead. | Can be set to string value (such as `"channel1"` in the example above) or string format value (such as `"{podName}"`, etc.). [See all of template tags you can use in your component metadata.]({{% ref "component-schema.md#templated-metadata-values" %}}) | clientID | N | A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to `"namespace.appID"` for Kubernetes mode or `"appID"` for Self-Hosted mode. | `"my-namespace.my-dapr-app"`, `"my-dapr-app"` | authRequired | N | *Deprecated* Enable [SASL](https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer) authentication with the Kafka brokers. | `"true"`, `"false"` -| authType | Y | Configure or disable authentication. Supported values: `none`, `password`, `mtls`, `oidc` or `awsiam` | `"password"`, `"none"` +| authType | Y | Configure or disable authentication. Supported values: `none`, `password`, `mtls`, `oidc`, `oidc_private_key_jwt` or `awsiam` | `"password"`, `"none"` | saslUsername | N | The SASL username used for authentication. Only required if `authType` is set to `"password"`. | `"adminuser"` | saslPassword | N | The SASL password used for authentication. Can be `secretKeyRef` to use a [secret reference]({{% ref component-secrets.md %}}). Only required if `authType is set to `"password"`. | `""`, `"KeFg23!"` | saslMechanism | N | The SASL Authentication Mechanism you wish to use. Only required if `authType` is set to `"password"`. Defaults to `PLAINTEXT` | `"SHA-512", "SHA-256", "PLAINTEXT"` @@ -103,10 +103,14 @@ spec: | clientKey | N | Client key, required for `authType` `mtls` Can be `secretKeyRef` to use a secret reference | `"-----BEGIN RSA PRIVATE KEY-----\n\n-----END RSA PRIVATE KEY-----"` | skipVerify | N | Skip TLS verification, this is not recommended for use in production. Defaults to `"false"` | `"true"`, `"false"` | | disableTls | N | Disable TLS for transport security. To disable, you're not required to set value to `"true"`. This is not recommended for use in production. Defaults to `"false"`. | `"true"`, `"false"` | -| oidcTokenEndpoint | N | Full URL to an OAuth2 identity provider access token endpoint. Required when `authType` is set to `oidc` | "https://identity.example.com/v1/token" | -| oidcClientID | N | The OAuth2 client ID that has been provisioned in the identity provider. Required when `authType` is set to `oidc` | `dapr-kafka` | +| oidcTokenEndpoint | N | Full URL to an OAuth2 identity provider access token endpoint. Required when `authType` is set to `oidc` or `oidc_private_key_jwt` | "https://identity.example.com/v1/token" | +| oidcClientID | N | The OAuth2 client ID that has been provisioned in the identity provider. Required when `authType` is set to `oidc` or `oidc_private_key_jwt` | `dapr-kafka` | | oidcClientSecret | N | The OAuth2 client secret that has been provisioned in the identity provider: Required when `authType` is set to `oidc` | `"KeFg23!"` | -| oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc`. Defaults to `"openid"` | `"openid,kafka-prod"` | +| oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc` or `oidc_private_key_jwt`. Defaults to `"openid"` | `"openid,kafka-prod"` | +| oidcClientAssertionCert | N | The OAuth2 client assertion certificate used for authentication. Required when `authType` is set to `oidc_private_key_jwt`. Can be `secretKeyRef` to use a secret reference | `"-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"` | +| oidcClientAssertionKey | N | The OAuth2 client assertion key used for authentication. Required when `authType` is set to `oidc_private_key_jwt`. Can be `secretKeyRef` to use a secret reference | `"-----BEGIN RSA PRIVATE KEY-----\n...\n-----END RSA PRIVATE KEY-----"` | +| oidcResource | N | The OAuth2 resource to request with the access token. Recommended when `authType` is set to `oidc_private_key_jwt`. | `"api://kafka"` | +| oidcAudience | N | The OAuth2 audience to request with the access token. Recommended when `authType` is set to `oidc_private_key_jwt`. | `"http:///realms/local"` | | oidcExtensions | N | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` | | awsRegion | N | This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'region' instead. The AWS region where the Kafka cluster is deployed to. Required when `authType` is set to `awsiam` | `us-west-1` | | awsAccessKey | N | This maintains backwards compatibility with existing fields. It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. AWS access key associated with an IAM account. | `"accessKey"` @@ -140,12 +144,13 @@ The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Ka Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the `authRequired` field has been deprecated from the v1.6 release and instead the `authType` field should be used. If `authRequired` is set to `true`, Dapr will attempt to configure `authType` correctly -based on the value of `saslPassword`. The valid values for `authType` are: +based on the value of `saslPassword`. The valid values for `authType` are: - `none` - `password` - `certificate` - `mtls` -- `oidc` +- `oidc` +- `oidc_private_key_jwt` - `awsiam` {{% alert title="Note" color="primary" %}} @@ -335,6 +340,70 @@ spec: value: 0.10.2.0 ``` +#### OAuth2 Private Key JWT + +Setting `authType` to `oidc_private_key_jwt` enables SASL authentication via the **OAUTHBEARER** mechanism. This supports specifying a private key JWT from an external OAuth2 or [OIDC](https://en.wikipedia.org/wiki/OpenID) identity provider. Currently, only the **client_credentials** grant is supported. + +Configure `oidcTokenEndpoint` to the full URL for the identity provider access token endpoint. + +Set `oidcClientID` to the client ID, `oidcClientAssertionCert` to the client assertion certificate and `oidcClientAssertionKey` to the client assertion key provisioned in the identity provider. + +If `caCert` is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if `skipVerify` is specified in the component configuration, verification will also be skipped when accessing the identity provider. + +By default, the only scope requested for the token is `openid`; it is **highly** recommended that additional scopes be specified via `oidcScopes` in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token, +a compromised Kafka broker could replay the token to access other services as the Dapr clientID. + +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: kafka-pubsub +spec: + type: pubsub.kafka + version: v1 + metadata: + - name: brokers # Required. Kafka broker connection setting + value: "dapr-kafka.myapp.svc.cluster.local:9092" + - name: consumerGroup # Optional. Used for input bindings. + value: "group1" + - name: clientID # Optional. Used as client tracing ID by Kafka brokers. + value: "my-dapr-app-id" + - name: authType # Required. + value: "oidc_private_key_jwt" + - name: oidcTokenEndpoint # Required if authType is `oidc_private_key_jwt`. + value: "https://identity.example.com/v1/token" + - name: oidcClientID # Required if authType is `oidc_private_key_jwt`. + value: "dapr-myapp" + - name: oidcClientAssertionCert # Required if authType is `oidc_private_key_jwt`. + secretKeyRef: + name: kafka-tls + key: oidcClientAssertionCert + - name: oidcClientAssertionKey # Required if authType is `oidc_private_key_jwt`. + secretKeyRef: + name: kafka-tls + key: oidcClientAssertionKey + - name: oidcScopes # Recommended if authType is `oidc_private_key_jwt`. + value: "openid,kafka-dev" + - name: oidcResource # Optional. + value: "api://kafka" + - name: oidcAudience # Optional. + value: "http:///realms/local" + - name: caCert # Optional. + secretKeyRef: + name: kafka-tls + key: caCert + - name: maxMessageBytes # Optional. + value: 1024 + - name: consumeRetryInterval # Optional. + value: 200ms + - name: heartbeatInterval # Optional. + value: 5s + - name: sessionTimeout # Optional. + value: 15s + - name: version # Optional. + value: 0.10.2.0 +``` + #### AWS IAM Authenticating with AWS IAM is supported with MSK. Setting `authType` to `awsiam` uses AWS SDK to generate auth tokens to authenticate. @@ -418,9 +487,9 @@ auth: ## Consuming from multiple topics -When consuming from multiple topics using a single pub/sub component, there is no guarantee about how the consumers in your consumer group are balanced across the topic partitions. +When consuming from multiple topics using a single pub/sub component, there is no guarantee about how the consumers in your consumer group are balanced across the topic partitions. -For instance, let's say you are subscribing to two topics with 10 partitions per topic and you have 20 replicas of your service consuming from the two topics. There is no guarantee that 10 will be assigned to the first topic and 10 to the second topic. Instead, the partitions could be divided unequally, with more than 10 assigned to the first topic and the rest assigned to the second topic. +For instance, let's say you are subscribing to two topics with 10 partitions per topic and you have 20 replicas of your service consuming from the two topics. There is no guarantee that 10 will be assigned to the first topic and 10 to the second topic. Instead, the partitions could be divided unequally, with more than 10 assigned to the first topic and the rest assigned to the second topic. This can result in idle consumers listening to the first topic and over-extended consumers on the second topic, or vice versa. This same behavior can be observed when using auto-scalers such as HPA or KEDA. @@ -475,7 +544,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name can either be `partitionKey` or `__key` +The param name can either be `partitionKey` or `__key` Example: @@ -552,7 +621,7 @@ app.include_router(router) ## Receiving message headers with special characters -The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. +The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. HTTP header values must follow specifications, making some characters not allowed. [Learn more about the protocols](https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2). In this case, you can enable `escapeHeaders` configuration setting, which uses URL escaping to encode header values on the consumer side. @@ -593,13 +662,13 @@ Currently, only message value serialization/deserialization is supported. Since Please note that `rawPayload=true` should NOT be set for consumers, as the message value will be wrapped into a CloudEvent and base64-encoded. Leaving `rawPayload` as default (i.e. `false`) will send the Avro-decoded message to the application as a JSON payload. -When setting the `useAvroJson` component metadata to `true`, the inbound/outbound Avro binary is converted into/from Avro JSON encoding. -This can be preferable when accurate type mapping is desirable. -The default is standard JSON which is typically easier to bind to a native type in an application. +When setting the `useAvroJson` component metadata to `true`, the inbound/outbound Avro binary is converted into/from Avro JSON encoding. +This can be preferable when accurate type mapping is desirable. +The default is standard JSON which is typically easier to bind to a native type in an application. {{% /alert %}} When configuring the Kafka pub/sub component metadata, you must define: -- The schema registry URL +- The schema registry URL - The API key/secret, if applicable Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named `my-topic`, the schema subject will be `my-topic-value`. @@ -679,16 +748,16 @@ app.include_router(router) ``` {{% /tab %}} -{{< /tabpane >}} +{{< /tabpane >}} ### Avoiding downstream side effects when publishing messages requiring custom metadata -Dapr allows customizing the publishing behavior by setting custom publish metadata. +Dapr allows customizing the publishing behavior by setting custom publish metadata. For instance, to publish in avro format, it is required to set the `valueSchemaType=Avro` metadata. However, by default these metadata items get converted to Kafka headers and published along with the message. This default behavior is very helpful for instance to forward tracing headers across a chain of publishers/consumers. -In certain scenario, however, it has unwanted side effects. +In certain scenario, however, it has unwanted side effects. Let's assume you consume an Avro message using Dapr with the headers above.If this message cannot be consumed successfully and configured to be sent to a dead letter topic, `valueSchemaType=Avro` will be automatically carried forward when publishing to the dead letter topic, requiring the set up of a schema associated with this topic. In many scenarios, it is preferable to publish dead letter messages in JSON only, as complying to a determined schema is not possible. To avoid this behavior, the kafka-pubsub component can be configured to exclude certain metadata keys from being converted to/from headers. @@ -709,24 +778,24 @@ metadata: ``` ### Overriding default consumer group rebalancing -In Kafka, rebalancing strategies determine how partitions are assigned to consumers within a consumer group. The default strategy is "range", but "roundrobin" and "sticky" are also available. +In Kafka, rebalancing strategies determine how partitions are assigned to consumers within a consumer group. The default strategy is "range", but "roundrobin" and "sticky" are also available. - `Range`: -Partitions are assigned to consumers based on their lexicographical order. -If you have three partitions (0, 1, 2) and two consumers (A, B), consumer A might get partitions 0 and 1, while consumer B gets partition 2. +Partitions are assigned to consumers based on their lexicographical order. +If you have three partitions (0, 1, 2) and two consumers (A, B), consumer A might get partitions 0 and 1, while consumer B gets partition 2. - `RoundRobin`: -Partitions are assigned to consumers in a round-robin fashion. -With the same example above, consumer A might get partitions 0 and 2, while consumer B gets partition 1. +Partitions are assigned to consumers in a round-robin fashion. +With the same example above, consumer A might get partitions 0 and 2, while consumer B gets partition 1. - `Sticky`: -This strategy aims to preserve previous assignments as much as possible while still maintaining a balanced distribution. -If a consumer leaves or joins the group, only the affected partitions are reassigned, minimizing disruption. +This strategy aims to preserve previous assignments as much as possible while still maintaining a balanced distribution. +If a consumer leaves or joins the group, only the affected partitions are reassigned, minimizing disruption. #### Choosing a Strategy: - `Range`: -Simple to understand and implement, but can lead to uneven distribution if partition sizes vary significantly. +Simple to understand and implement, but can lead to uneven distribution if partition sizes vary significantly. - `RoundRobin`: -Provides a good balance in many cases, but might not be optimal if message keys are unevenly distributed. +Provides a good balance in many cases, but might not be optimal if message keys are unevenly distributed. - `Sticky`: -Generally preferred for its ability to minimize disruption during rebalances, especially when dealing with a large number of partitions or frequent consumer group changes. +Generally preferred for its ability to minimize disruption during rebalances, especially when dealing with a large number of partitions or frequent consumer group changes. ## Create a Kafka instance