Skip to content

Commit 116e634

Browse files
committed
Merge pull request #636 from dpkp/french_connection
Simplify connection logic
2 parents 71b0d05 + c1ab194 commit 116e634

8 files changed

+64
-74
lines changed

Diff for: kafka/client.py

+26-11
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,12 @@ def _get_conn(self, host, port, afi):
6767
)
6868

6969
conn = self._conns[host_key]
70-
while conn.connect() == ConnectionStates.CONNECTING:
71-
pass
70+
timeout = time.time() + self.timeout
71+
while time.time() < timeout:
72+
if conn.connect() is ConnectionStates.CONNECTED:
73+
break
74+
else:
75+
raise ConnectionError("%s:%s (%s)" % (host, port, afi))
7276
return conn
7377

7478
def _get_leader_for_partition(self, topic, partition):
@@ -149,9 +153,11 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
149153
random.shuffle(hosts)
150154

151155
for (host, port, afi) in hosts:
152-
conn = self._get_conn(host, port, afi)
153-
if not conn.connected():
154-
log.warning("Skipping unconnected connection: %s", conn)
156+
try:
157+
conn = self._get_conn(host, port, afi)
158+
except ConnectionError:
159+
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
160+
host, port, afi)
155161
continue
156162
request = encoder_fn(payloads=payloads)
157163
future = conn.send(request)
@@ -233,9 +239,9 @@ def failed_payloads(payloads):
233239

234240

235241
host, port, afi = get_ip_port_afi(broker.host)
236-
conn = self._get_conn(host, broker.port, afi)
237-
conn.connect()
238-
if not conn.connected():
242+
try:
243+
conn = self._get_conn(host, broker.port, afi)
244+
except ConnectionError:
239245
refresh_metadata = True
240246
failed_payloads(broker_payloads)
241247
continue
@@ -419,10 +425,19 @@ def copy(self):
419425
return c
420426

421427
def reinit(self):
422-
for conn in self._conns.values():
428+
timeout = time.time() + self.timeout
429+
conns = set(self._conns.values())
430+
for conn in conns:
423431
conn.close()
424-
while conn.connect() == ConnectionStates.CONNECTING:
425-
pass
432+
conn.connect()
433+
434+
while time.time() < timeout:
435+
for conn in list(conns):
436+
conn.connect()
437+
if conn.connected():
438+
conns.remove(conn)
439+
if not conns:
440+
break
426441

427442
def reset_topic_metadata(self, *topics):
428443
for topic in topics:

Diff for: kafka/client_async.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def _bootstrap(self, hosts):
118118
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
119119
bootstrap = BrokerConnection(host, port, afi, **self.config)
120120
bootstrap.connect()
121-
while bootstrap.state is ConnectionStates.CONNECTING:
121+
while bootstrap.connecting():
122122
bootstrap.connect()
123123
if bootstrap.state is not ConnectionStates.CONNECTED:
124124
bootstrap.close()
@@ -164,7 +164,7 @@ def _maybe_connect(self, node_id):
164164
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
165165
**self.config)
166166
state = self._conns[node_id].connect()
167-
if state is ConnectionStates.CONNECTING:
167+
if self._conns[node_id].connecting():
168168
self._connecting.add(node_id)
169169

170170
# Whether CONNECTED or DISCONNECTED, we need to remove from connecting
@@ -251,7 +251,7 @@ def connection_delay(self, node_id):
251251
time_waited_ms = time.time() - (conn.last_attempt or 0)
252252
if conn.state is ConnectionStates.DISCONNECTED:
253253
return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
254-
elif conn.state is ConnectionStates.CONNECTING:
254+
elif conn.connecting():
255255
return 0
256256
else:
257257
return 999999999

Diff for: kafka/conn.py

+15-17
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def connect(self):
7777
"""Attempt to connect and return ConnectionState"""
7878
if self.state is ConnectionStates.DISCONNECTED:
7979
self.close()
80+
log.debug('%s: creating new socket', str(self))
8081
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
8182
if self.config['receive_buffer_bytes'] is not None:
8283
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
@@ -85,23 +86,9 @@ def connect(self):
8586
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
8687
self.config['send_buffer_bytes'])
8788
self._sock.setblocking(False)
88-
try:
89-
ret = self._sock.connect_ex((self.host, self.port))
90-
except socket.error as ret:
91-
pass
89+
self.state = ConnectionStates.CONNECTING
9290
self.last_attempt = time.time()
9391

94-
if not ret or ret == errno.EISCONN:
95-
self.state = ConnectionStates.CONNECTED
96-
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
97-
elif ret in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
98-
self.state = ConnectionStates.CONNECTING
99-
else:
100-
log.error('Connect attempt to %s returned error %s.'
101-
' Disconnecting.', self, ret)
102-
self.close()
103-
self.last_failure = time.time()
104-
10592
if self.state is ConnectionStates.CONNECTING:
10693
# in non-blocking mode, use repeated calls to socket.connect_ex
10794
# to check connection status
@@ -110,17 +97,27 @@ def connect(self):
11097
ret = self._sock.connect_ex((self.host, self.port))
11198
except socket.error as ret:
11299
pass
100+
101+
# Connection succeeded
113102
if not ret or ret == errno.EISCONN:
103+
log.debug('%s: established TCP connection', str(self))
114104
self.state = ConnectionStates.CONNECTED
105+
106+
# Connection failed
107+
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
115108
elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
116109
log.error('Connect attempt to %s returned error %s.'
117110
' Disconnecting.', self, ret)
118111
self.close()
119-
self.last_failure = time.time()
112+
113+
# Connection timedout
120114
elif time.time() > request_timeout + self.last_attempt:
121115
log.error('Connection attempt to %s timed out', self)
122116
self.close() # error=TimeoutError ?
123-
self.last_failure = time.time()
117+
118+
# Needs retry
119+
else:
120+
pass
124121

125122
return self.state
126123

@@ -155,6 +152,7 @@ def close(self, error=None):
155152
self._sock.close()
156153
self._sock = None
157154
self.state = ConnectionStates.DISCONNECTED
155+
self.last_failure = time.time()
158156
self._receiving = False
159157
self._next_payload_bytes = 0
160158
self._rbuffer.seek(0)

Diff for: test/conftest.py

+19
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,22 @@ def fin():
3131
k.close()
3232
request.addfinalizer(fin)
3333
return k
34+
35+
36+
@pytest.fixture
37+
def conn(mocker):
38+
from kafka.conn import ConnectionStates
39+
from kafka.future import Future
40+
from kafka.protocol.metadata import MetadataResponse
41+
conn = mocker.patch('kafka.client_async.BrokerConnection')
42+
conn.return_value = conn
43+
conn.state = ConnectionStates.CONNECTED
44+
conn.send.return_value = Future().success(
45+
MetadataResponse[0](
46+
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
47+
[])) # topics
48+
conn.blacked_out.return_value = False
49+
conn.connect.side_effect = lambda: conn.state
50+
conn.connecting = lambda: conn.connect() is ConnectionStates.CONNECTING
51+
conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
52+
return conn

Diff for: test/test_client_async.py

-15
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,6 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
3131
assert sorted(hosts) == sorted(expected_hosts)
3232

3333

34-
@pytest.fixture
35-
def conn(mocker):
36-
conn = mocker.patch('kafka.client_async.BrokerConnection')
37-
conn.return_value = conn
38-
conn.state = ConnectionStates.CONNECTED
39-
conn.send.return_value = Future().success(
40-
MetadataResponse[0](
41-
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
42-
[])) # topics
43-
conn.blacked_out.return_value = False
44-
conn.connect.side_effect = lambda: conn.state
45-
conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
46-
return conn
47-
48-
4934
def test_bootstrap_success(conn):
5035
conn.state = ConnectionStates.CONNECTED
5136
cli = KafkaClient()

Diff for: test/test_conn.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def socket(mocker):
2424
@pytest.fixture
2525
def conn(socket):
2626
from socket import AF_INET
27-
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
27+
conn = BrokerConnection('localhost', 9092, AF_INET)
2828
return conn
2929

3030

Diff for: test/test_consumer_group.py

-14
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
from kafka import SimpleClient
1010
from kafka.conn import ConnectionStates
1111
from kafka.consumer.group import KafkaConsumer
12-
from kafka.future import Future
13-
from kafka.protocol.metadata import MetadataResponse
1412
from kafka.structs import TopicPartition
1513

1614
from test.conftest import version
@@ -140,18 +138,6 @@ def test_paused(kafka_broker, topic):
140138
assert set() == consumer.paused()
141139

142140

143-
@pytest.fixture
144-
def conn(mocker):
145-
conn = mocker.patch('kafka.client_async.BrokerConnection')
146-
conn.return_value = conn
147-
conn.state = ConnectionStates.CONNECTED
148-
conn.send.return_value = Future().success(
149-
MetadataResponse[0](
150-
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
151-
[])) # topics
152-
return conn
153-
154-
155141
def test_heartbeat_timeout(conn, mocker):
156142
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
157143
mocker.patch('time.time', return_value = 1234)

Diff for: test/test_coordinator.py

-13
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from kafka.coordinator.consumer import ConsumerCoordinator
1313
from kafka.coordinator.protocol import (
1414
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
15-
from kafka.conn import ConnectionStates
1615
import kafka.errors as Errors
1716
from kafka.future import Future
1817
from kafka.protocol.commit import (
@@ -22,18 +21,6 @@
2221
from kafka.util import WeakMethod
2322

2423

25-
@pytest.fixture
26-
def conn(mocker):
27-
conn = mocker.patch('kafka.client_async.BrokerConnection')
28-
conn.return_value = conn
29-
conn.state = ConnectionStates.CONNECTED
30-
conn.send.return_value = Future().success(
31-
MetadataResponse[0](
32-
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
33-
[])) # topics
34-
return conn
35-
36-
3724
@pytest.fixture
3825
def coordinator(conn):
3926
return ConsumerCoordinator(KafkaClient(), SubscriptionState())

0 commit comments

Comments
 (0)