Skip to content

Commit

Permalink
Merge pull request #8860 from RasaHQ/kafka-sasl-2-3-x-backport
Browse files Browse the repository at this point in the history
Added SASL mechanism as Optional Kafka Parameter (2.3.X Backport)
  • Loading branch information
virtualroot authored Jun 16, 2021
2 parents 9f682c0 + bdb26c0 commit cf0b93c
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 21 deletions.
1 change: 1 addition & 0 deletions changelog/8860.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added `sasl_mechanism` as an optional configurable parameters for the [Kafka Producer](event-brokers.mdx#kafka-event-broker).
12 changes: 12 additions & 0 deletions data/test_endpoints/event_brokers/kafka_invalid_sasl_mechanism.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
event_broker:
type: kafka
security_protocol: SASL_SSL
topic: topic
url: localhost
sasl_username: username
sasl_password: password
sasl_mechanism: SOMETHING
ssl_cafile: CARoot.pem
ssl_certfile: certificate.pem
ssl_keyfile: key.pem
ssl_check_hostname: True
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ event_broker:
url: localhost
sasl_username: username
sasl_password: password
sasl_mechanism: PLAIN
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ event_broker:
url: localhost
sasl_username: username
sasl_password: password
sasl_mechanism: PLAIN
ssl_cafile: CARoot.pem
ssl_certfile: certificate.pem
ssl_keyfile: key.pem
Expand Down
51 changes: 31 additions & 20 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@
from rasa.core.brokers.broker import EventBroker
from rasa.shared.utils.io import DEFAULT_ENCODING
from rasa.utils.endpoints import EndpointConfig
from rasa.shared.exceptions import RasaException

logger = logging.getLogger(__name__)


class KafkaProducerInitializationError(RasaException):
"""Raised if the Kafka Producer cannot be properly initialized."""


class KafkaEventBroker(EventBroker):
"""Kafka event broker."""

def __init__(
self,
url: Union[Text, List[Text], None],
topic: Text = "rasa_core_events",
client_id: Optional[Text] = None,
sasl_username: Optional[Text] = None,
sasl_password: Optional[Text] = None,
sasl_mechanism: Optional[Text] = "PLAIN",
ssl_cafile: Optional[Text] = None,
ssl_certfile: Optional[Text] = None,
ssl_keyfile: Optional[Text] = None,
Expand All @@ -43,6 +51,10 @@ def __init__(
logging with respect to producer group administration.
sasl_username: Username for plain authentication.
sasl_password: Password for plain authentication.
sasl_mechanism: Authentication mechanism when security_protocol is
configured for SASL_PLAINTEXT or SASL_SSL.
Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256,
SCRAM-SHA-512. Default: `PLAIN`
ssl_cafile: Optional filename of ca file to use in certificate
verification.
ssl_certfile: Optional filename of file in pem format containing
Expand All @@ -65,6 +77,7 @@ def __init__(
self.security_protocol = security_protocol.upper()
self.sasl_username = sasl_username
self.sasl_password = sasl_password
self.sasl_mechanism = sasl_mechanism
self.ssl_cafile = ssl_cafile
self.ssl_certfile = ssl_certfile
self.ssl_keyfile = ssl_keyfile
Expand Down Expand Up @@ -123,53 +136,51 @@ def _create_producer(self) -> None:
import kafka

if self.security_protocol == "PLAINTEXT":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
security_protocol=self.security_protocol,
ssl_check_hostname=False,
authentication_params = dict(
security_protocol=self.security_protocol, ssl_check_hostname=False,
)
elif self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
authentication_params = dict(
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
sasl_mechanism="PLAIN",
sasl_mechanism=self.sasl_mechanism,
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
authentication_params = dict(
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SASL_SSL":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
authentication_params = dict(
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
sasl_mechanism=self.sasl_mechanism,
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=self.ssl_check_hostname,
security_protocol=self.security_protocol,
sasl_mechanism="PLAIN",
)
else:
raise ValueError(
f"Cannot initialise `KafkaEventBroker`: "
f"Invalid `security_protocol` ('{self.security_protocol}')."
)
try:
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
**authentication_params,
)
except AssertionError as e:
raise KafkaProducerInitializationError(
f"Cannot initialise `KafkaEventBroker`: {e}"
)

def _publish(self, event) -> None:
logger.debug(f"Calling kafka send({self.topic}, {event})")
Expand Down
7 changes: 6 additions & 1 deletion tests/core/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import rasa.utils.io
from rasa.core.brokers.broker import EventBroker
from rasa.core.brokers.file import FileEventBroker
from rasa.core.brokers.kafka import KafkaEventBroker
from rasa.core.brokers.kafka import KafkaEventBroker, KafkaProducerInitializationError
from rasa.core.brokers.pika import PikaEventBroker, DEFAULT_QUEUE_NAME
from rasa.core.brokers.sql import SQLEventBroker
from rasa.shared.core.events import Event, Restarted, SlotSet, UserUttered
Expand Down Expand Up @@ -246,13 +246,15 @@ async def test_kafka_broker_from_config():
"localhost",
sasl_username="username",
sasl_password="password",
sasl_mechanism="PLAIN",
topic="topic",
security_protocol="SASL_PLAINTEXT",
)

assert actual.url == expected.url
assert actual.sasl_username == expected.sasl_username
assert actual.sasl_password == expected.sasl_password
assert actual.sasl_mechanism == expected.sasl_mechanism
assert actual.topic == expected.topic


Expand All @@ -270,6 +272,9 @@ async def test_kafka_broker_from_config():
("kafka_invalid_security_protocol.yml", ValueError),
# `TypeError` exception is raised when there is no `url` specified
("kafka_plaintext_endpoint_no_url.yml", TypeError),
# `KafkaProducerInitializationError` is raised when an invalid
# `sasl_mechanism` is provided
("kafka_invalid_sasl_mechanism.yml", KafkaProducerInitializationError),
],
)
async def test_kafka_broker_security_protocols(file: Text, exception: Exception):
Expand Down

0 comments on commit cf0b93c

Please sign in to comment.