diff --git a/changelog/8892.feature.md b/changelog/8892.feature.md new file mode 100644 index 000000000000..cca9f89f1477 --- /dev/null +++ b/changelog/8892.feature.md @@ -0,0 +1 @@ +Added `sasl_mechanism` as an optional configurable parameters for the [Kafka Producer](event-brokers.mdx#kafka-event-broker). \ No newline at end of file diff --git a/data/test_endpoints/event_brokers/kafka_invalid_sasl_mechanism.yml b/data/test_endpoints/event_brokers/kafka_invalid_sasl_mechanism.yml new file mode 100644 index 000000000000..fe72766c406c --- /dev/null +++ b/data/test_endpoints/event_brokers/kafka_invalid_sasl_mechanism.yml @@ -0,0 +1,12 @@ +event_broker: + type: kafka + security_protocol: SASL_SSL + topic: topic + url: localhost + sasl_username: username + sasl_password: password + sasl_mechanism: SOMETHING + ssl_cafile: CARoot.pem + ssl_certfile: certificate.pem + ssl_keyfile: key.pem + ssl_check_hostname: True diff --git a/data/test_endpoints/event_brokers/kafka_sasl_plaintext_endpoint.yml b/data/test_endpoints/event_brokers/kafka_sasl_plaintext_endpoint.yml index fe46dc2e4b30..7b9cb32dd863 100644 --- a/data/test_endpoints/event_brokers/kafka_sasl_plaintext_endpoint.yml +++ b/data/test_endpoints/event_brokers/kafka_sasl_plaintext_endpoint.yml @@ -6,3 +6,4 @@ event_broker: partition_by_sender: True sasl_username: username sasl_password: password + sasl_mechanism: PLAIN diff --git a/data/test_endpoints/event_brokers/kafka_sasl_ssl_endpoint.yml b/data/test_endpoints/event_brokers/kafka_sasl_ssl_endpoint.yml index 3fceb230ae3f..1d2890bae421 100644 --- a/data/test_endpoints/event_brokers/kafka_sasl_ssl_endpoint.yml +++ b/data/test_endpoints/event_brokers/kafka_sasl_ssl_endpoint.yml @@ -5,6 +5,7 @@ event_broker: url: localhost sasl_username: username sasl_password: password + sasl_mechanism: PLAIN ssl_cafile: CARoot.pem ssl_certfile: certificate.pem ssl_keyfile: key.pem diff --git a/rasa/core/brokers/kafka.py b/rasa/core/brokers/kafka.py index 20ac097aaa0e..fd987471c4b5 100644 --- a/rasa/core/brokers/kafka.py +++ b/rasa/core/brokers/kafka.py @@ -7,11 +7,18 @@ from rasa.core.brokers.broker import EventBroker from rasa.shared.utils.io import DEFAULT_ENCODING from rasa.utils.endpoints import EndpointConfig +from rasa.shared.exceptions import RasaException logger = logging.getLogger(__name__) +class KafkaProducerInitializationError(RasaException): + """Raised if the Kafka Producer cannot be properly initialized.""" + + class KafkaEventBroker(EventBroker): + """Kafka event broker.""" + def __init__( self, url: Union[Text, List[Text], None], @@ -20,6 +27,7 @@ def __init__( partition_by_sender: bool = False, sasl_username: Optional[Text] = None, sasl_password: Optional[Text] = None, + sasl_mechanism: Optional[Text] = "PLAIN", ssl_cafile: Optional[Text] = None, ssl_certfile: Optional[Text] = None, ssl_keyfile: Optional[Text] = None, @@ -45,6 +53,10 @@ def __init__( sender_id or not sasl_username: Username for plain authentication. sasl_password: Password for plain authentication. + sasl_mechanism: Authentication mechanism when security_protocol is + configured for SASL_PLAINTEXT or SASL_SSL. + Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, + SCRAM-SHA-512. Default: `PLAIN` ssl_cafile: Optional filename of ca file to use in certificate verification. ssl_certfile: Optional filename of file in pem format containing @@ -68,6 +80,7 @@ def __init__( self.security_protocol = security_protocol.upper() self.sasl_username = sasl_username self.sasl_password = sasl_password + self.sasl_mechanism = sasl_mechanism self.ssl_cafile = ssl_cafile self.ssl_certfile = ssl_certfile self.ssl_keyfile = ssl_keyfile @@ -129,28 +142,18 @@ def _create_producer(self) -> None: import kafka if self.security_protocol == "PLAINTEXT": - self.producer = kafka.KafkaProducer( - client_id=self.client_id, - bootstrap_servers=self.url, - value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), - security_protocol=self.security_protocol, - ssl_check_hostname=False, + authentication_params = dict( + security_protocol=self.security_protocol, ssl_check_hostname=False, ) elif self.security_protocol == "SASL_PLAINTEXT": - self.producer = kafka.KafkaProducer( - client_id=self.client_id, - bootstrap_servers=self.url, - value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), + authentication_params = dict( sasl_plain_username=self.sasl_username, sasl_plain_password=self.sasl_password, - sasl_mechanism="PLAIN", + sasl_mechanism=self.sasl_mechanism, security_protocol=self.security_protocol, ) elif self.security_protocol == "SSL": - self.producer = kafka.KafkaProducer( - client_id=self.client_id, - bootstrap_servers=self.url, - value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), + authentication_params = dict( ssl_cafile=self.ssl_cafile, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile, @@ -158,24 +161,32 @@ def _create_producer(self) -> None: security_protocol=self.security_protocol, ) elif self.security_protocol == "SASL_SSL": - self.producer = kafka.KafkaProducer( - client_id=self.client_id, - bootstrap_servers=self.url, - value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), + authentication_params = dict( sasl_plain_username=self.sasl_username, sasl_plain_password=self.sasl_password, + sasl_mechanism=self.sasl_mechanism, ssl_cafile=self.ssl_cafile, ssl_certfile=self.ssl_certfile, ssl_keyfile=self.ssl_keyfile, ssl_check_hostname=self.ssl_check_hostname, security_protocol=self.security_protocol, - sasl_mechanism="PLAIN", ) else: raise ValueError( f"Cannot initialise `KafkaEventBroker`: " f"Invalid `security_protocol` ('{self.security_protocol}')." ) + try: + self.producer = kafka.KafkaProducer( + client_id=self.client_id, + bootstrap_servers=self.url, + value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING), + **authentication_params, + ) + except AssertionError as e: + raise KafkaProducerInitializationError( + f"Cannot initialise `KafkaEventBroker`: {e}" + ) def _publish(self, event: Dict[Text, Any]) -> None: if self.partition_by_sender: diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index 7e008707fd52..c53af4190ed6 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -19,7 +19,7 @@ import rasa.utils.io from rasa.core.brokers.broker import EventBroker from rasa.core.brokers.file import FileEventBroker -from rasa.core.brokers.kafka import KafkaEventBroker +from rasa.core.brokers.kafka import KafkaEventBroker, KafkaProducerInitializationError from rasa.core.brokers.pika import PikaEventBroker, DEFAULT_QUEUE_NAME from rasa.core.brokers.sql import SQLEventBroker from rasa.shared.core.events import Event, Restarted, SlotSet, UserUttered @@ -259,6 +259,7 @@ async def test_kafka_broker_from_config(): "localhost", sasl_username="username", sasl_password="password", + sasl_mechanism="PLAIN", topic="topic", partition_by_sender=True, security_protocol="SASL_PLAINTEXT", @@ -267,6 +268,7 @@ async def test_kafka_broker_from_config(): assert actual.url == expected.url assert actual.sasl_username == expected.sasl_username assert actual.sasl_password == expected.sasl_password + assert actual.sasl_mechanism == expected.sasl_mechanism assert actual.topic == expected.topic assert actual.partition_by_sender == expected.partition_by_sender @@ -285,6 +287,9 @@ async def test_kafka_broker_from_config(): ("kafka_invalid_security_protocol.yml", ValueError), # `TypeError` exception is raised when there is no `url` specified ("kafka_plaintext_endpoint_no_url.yml", TypeError), + # `KafkaProducerInitializationError` is raised when an invalid + # `sasl_mechanism` is provided + ("kafka_invalid_sasl_mechanism.yml", KafkaProducerInitializationError), ], ) async def test_kafka_broker_security_protocols(file: Text, exception: Exception):