Skip to content

Commit

Permalink
feat(consumer_lag): Add possibility to use SASL auth into Kafka - for…
Browse files Browse the repository at this point in the history
… MSK
  • Loading branch information
Smejky338 committed Dec 12, 2023
1 parent f924e69 commit 5a52439
Showing 1 changed file with 38 additions and 12 deletions.
50 changes: 38 additions & 12 deletions opl/consumer_lag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,52 @@ class ConsumerLag:
bootstrap_server and kafka group as input.
"""

def __init__(self, topic, bootstrap_servers, group) -> None:
def __init__(
self, topic, bootstrap_servers, group, username="", password=""
) -> None:
self.topic = topic
self.group = group
self.bootstrap_servers = bootstrap_servers
self.logger = logging.getLogger("consumer_lag")
self.offset_records = {}
self.username = username
self.password = password

def _getconsumer(self):
consumer = KafkaConsumer(
bootstrap_servers=self.bootstrap_servers,
auto_offset_reset="latest",
enable_auto_commit=False,
max_poll_records=50,
max_poll_interval_ms=300000,
group_id=self.group,
session_timeout_ms=50000,
heartbeat_interval_ms=10000,
consumer_timeout_ms=100000,
)
# Common parameters for both cases
common_params = {
topic: self.topic,
bootstrap_servers: self.bootstrap_servers,
auto_offset_reset: "latest",
enable_auto_commit: False,
max_poll_records: 50,
max_poll_interval_ms: 300000,
group_id: self.group,
session_timeout_ms: 50000,
heartbeat_interval_ms: 10000,
consumer_timeout_ms: 100000,
}

# Kafka consumer creation: SASL or noauth

try:
logging.info(
f"Creating SASL password-protected Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {self.kafka_timeout} ms topic {kafka_topic}"
)
if self.username == "" or self.password == "":
raise ValueError("Password or username not provided!")
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.username,
"sasl_plain_password": self.password,
}
consumer = KafkaConsumer(**common_params, **sasl_params)
except (ValueError, AttributeError) as e:
logging.info(
f"Creating passwordless producer for for {kafka_hosts} in group {kafka_group} with timeout {kafka_timeout} ms topic {kafka_topic}"
)
consumer = KafkaConsumer(**common_params)
return consumer

def store_offset_records(self):
Expand Down

0 comments on commit 5a52439

Please sign in to comment.