Skip to content

Commit a1c62b8

Browse files
committed
Add Kraft to Kafka containers
1 parent 090bd0d commit a1c62b8

File tree

3 files changed

+134
-9
lines changed

3 files changed

+134
-9
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from typing import Callable
2+
3+
from packaging.version import Version
4+
5+
6+
class ComparableVersion:
7+
def __init__(self, version):
8+
self.version = Version(version)
9+
10+
def __lt__(self, other: str):
11+
return self._apply_op(other, lambda x, y: x < y)
12+
13+
def __le__(self, other: str):
14+
return self._apply_op(other, lambda x, y: x <= y)
15+
16+
def __eq__(self, other: str):
17+
return self._apply_op(other, lambda x, y: x == y)
18+
19+
def __ne__(self, other: str):
20+
return self._apply_op(other, lambda x, y: x != y)
21+
22+
def __gt__(self, other: str):
23+
return self._apply_op(other, lambda x, y: x > y)
24+
25+
def __ge__(self, other: str):
26+
return self._apply_op(other, lambda x, y: x >= y)
27+
28+
def _apply_op(self, other: str, op: Callable[[Version, Version], bool]):
29+
other = Version(other)
30+
return op(self.version, other)

modules/kafka/testcontainers/kafka/__init__.py

Lines changed: 98 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
import base64
12
import tarfile
23
import time
4+
import uuid
35
from io import BytesIO
46
from textwrap import dedent
57

8+
from typing_extensions import Self
9+
610
from testcontainers.core.container import DockerContainer
711
from testcontainers.core.utils import raise_for_deprecated_parameter
12+
from testcontainers.core.version import ComparableVersion
813
from testcontainers.core.waiting_utils import wait_for_logs
914
from testcontainers.kafka._redpanda import RedpandaContainer
1015

@@ -26,18 +31,29 @@ class KafkaContainer(DockerContainer):
2631
2732
>>> with KafkaContainer() as kafka:
2833
... connection = kafka.get_bootstrap_server()
34+
35+
# Using KRaft consensus protocol
36+
>>> with KafkaContainer().with_kraft() as kafka:
37+
... connection = kafka.get_bootstrap_server()
2938
"""
3039

3140
TC_START_SCRIPT = "/tc-start.sh"
41+
MIN_KRAFT_TAG = "7.0.0"
3242

3343
def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093, **kwargs) -> None:
3444
raise_for_deprecated_parameter(kwargs, "port_to_expose", "port")
3545
super().__init__(image, **kwargs)
3646
self.port = port
47+
self.kraft_enabled = False
48+
self.wait_for = r".*\[KafkaServer id=\d+\] started.*"
49+
self.boot_command = ""
50+
self.cluster_id = self._random_uuid()
51+
self.listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
52+
self.security_protocol_map = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
53+
3754
self.with_exposed_ports(self.port)
38-
listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
39-
self.with_env("KAFKA_LISTENERS", listeners)
40-
self.with_env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
55+
self.with_env("KAFKA_LISTENERS", self.listeners)
56+
self.with_env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", self.security_protocol_map)
4157
self.with_env("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
4258

4359
self.with_env("KAFKA_BROKER_ID", "1")
@@ -46,6 +62,82 @@ def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093,
4662
self.with_env("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "10000000")
4763
self.with_env("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
4864

65+
def with_kraft(self) -> Self:
66+
self._verify_min_kraft_version()
67+
self.kraft_enabled = True
68+
return self
69+
70+
def _verify_min_kraft_version(self):
71+
actual_version = self.image.split(":")[-1]
72+
73+
if ComparableVersion(actual_version) < self.MIN_KRAFT_TAG:
74+
raise ValueError(
75+
f"Provided Confluent Platform's version {actual_version} "
76+
f"is not supported in Kraft mode"
77+
f" (must be {self.MIN_KRAFT_TAG} or above)"
78+
)
79+
80+
def with_cluster_id(self, cluster_id: str) -> Self:
81+
self.cluster_id = cluster_id
82+
return self
83+
84+
@classmethod
85+
def _random_uuid(cls):
86+
uuid_value = uuid.uuid4()
87+
uuid_bytes = uuid_value.bytes
88+
base64_encoded_uuid = base64.b64encode(uuid_bytes)
89+
90+
return base64_encoded_uuid.decode()
91+
92+
def configure(self):
93+
if self.kraft_enabled:
94+
self._configure_kraft()
95+
else:
96+
self._configure_zookeeper()
97+
98+
def _configure_kraft(self) -> None:
99+
self.wait_for = r".*Kafka Server started.*"
100+
101+
self.with_env("CLUSTER_ID", self.cluster_id)
102+
self.with_env("KAFKA_NODE_ID", 1)
103+
self.with_env(
104+
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
105+
f"{self.security_protocol_map},CONTROLLER:PLAINTEXT",
106+
)
107+
self.with_env(
108+
"KAFKA_LISTENERS",
109+
f"{self.listeners},CONTROLLER://0.0.0.0:9094",
110+
)
111+
self.with_env("KAFKA_PROCESS_ROLES", "broker,controller")
112+
113+
network_alias = self._get_network_alias()
114+
controller_quorum_voters = f"1@{network_alias}:9094"
115+
self.with_env("KAFKA_CONTROLLER_QUORUM_VOTERS", controller_quorum_voters)
116+
self.with_env("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
117+
118+
self.boot_command = f"""
119+
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
120+
echo 'kafka-storage format --ignore-formatted -t {self.cluster_id} -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
121+
"""
122+
123+
def _get_network_alias(self):
124+
if self._network:
125+
return next(
126+
iter(self._network_aliases or [self._network.name or self._kwargs.get("network", [])]),
127+
None,
128+
)
129+
130+
return "localhost"
131+
132+
def _configure_zookeeper(self) -> None:
133+
self.boot_command = """
134+
echo 'clientPort=2181' > zookeeper.properties
135+
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
136+
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
137+
zookeeper-server-start zookeeper.properties &
138+
export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
139+
"""
140+
49141
def get_bootstrap_server(self) -> str:
50142
host = self.get_container_host_ip()
51143
port = self.get_exposed_port(self.port)
@@ -59,11 +151,7 @@ def tc_start(self) -> None:
59151
dedent(
60152
f"""
61153
#!/bin/bash
62-
echo 'clientPort=2181' > zookeeper.properties
63-
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
64-
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
65-
zookeeper-server-start zookeeper.properties &
66-
export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
154+
{self.boot_command}
67155
export KAFKA_ADVERTISED_LISTENERS={listeners}
68156
. /etc/confluent/docker/bash-config
69157
/etc/confluent/docker/configure
@@ -78,10 +166,11 @@ def tc_start(self) -> None:
78166
def start(self, timeout=30) -> "KafkaContainer":
79167
script = KafkaContainer.TC_START_SCRIPT
80168
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
169+
self.configure()
81170
self.with_command(command)
82171
super().start()
83172
self.tc_start()
84-
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
173+
wait_for_logs(self, self.wait_for, timeout=timeout)
85174
return self
86175

87176
def create_file(self, content: bytes, path: str) -> None:

modules/kafka/tests/test_kafka.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ def test_kafka_producer_consumer():
88
produce_and_consume_kafka_message(container)
99

1010

11+
def test_kafka_with_kraft_producer_consumer():
12+
with KafkaContainer().with_kraft() as container:
13+
assert container.kraft_enabled
14+
produce_and_consume_kafka_message(container)
15+
16+
1117
def test_kafka_producer_consumer_custom_port():
1218
with KafkaContainer(port=9888) as container:
1319
assert container.port == 9888

0 commit comments

Comments
 (0)