Skip to content

Commit 4f62860

Browse files
committed
add Writer.close() and Client.close(), improve Reader.close()
make Reader.close() prevent nsqd_tcp_addresses from reconnecting Writer.close() also closes conns and stops from reconnecting Client.close() stops the periodic _check_last_recv_timestamps()
1 parent 5d545f2 commit 4f62860

File tree

3 files changed

+33
-6
lines changed

3 files changed

+33
-6
lines changed

nsq/client.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
import time
44
import logging
55

6-
import tornado.ioloop
6+
from tornado.ioloop import IOLoop, PeriodicCallback
77

88
logger = logging.getLogger(__name__)
99

1010

1111
class Client(object):
1212
def __init__(self, **kwargs):
13-
self.io_loop = tornado.ioloop.IOLoop.current()
14-
tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start()
13+
self.io_loop = IOLoop.current()
14+
self.periodic_check = PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000)
15+
self.periodic_check.start()
1516

1617
def _on_connection_identify(self, conn, data, **kwargs):
1718
logger.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data))
@@ -75,3 +76,6 @@ def heartbeat(self, conn):
7576
:param conn: the :class:`nsq.AsyncConn` over which the heartbeat was received
7677
"""
7778
pass
79+
80+
def close(self):
81+
self.periodic_check.stop()

nsq/reader.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def __init__(
215215
self.backoff_block = False
216216
self.backoff_block_completed = True
217217

218+
self._closed = False
218219
self.conns = {}
219220
self.connection_attempts = {}
220221
self.http_client = tornado.httpclient.AsyncHTTPClient()
@@ -257,15 +258,19 @@ def _run(self):
257258

258259
def close(self):
259260
"""
260-
Closes all connections stops all periodic callbacks
261+
Closes all connections and stops all periodic callbacks
261262
"""
263+
self._closed = True
264+
262265
for conn in self.conns.values():
263266
conn.close()
264267

265268
self.redist_periodic.stop()
266269
if self.query_periodic is not None:
267270
self.query_periodic.stop()
268271

272+
super(Reader, self).close()
273+
269274
def set_message_handler(self, message_handler):
270275
"""
271276
Assigns the callback method to be executed for each message received
@@ -473,8 +478,8 @@ def connect_to_nsqd(self, host, port):
473478
# only attempt to re-connect once every 10s per destination
474479
# this throttles reconnects to failed endpoints
475480
now = time.time()
476-
last_connect_attempt = self.connection_attempts.get(conn.id)
477-
if last_connect_attempt and last_connect_attempt > now - 10:
481+
last_connect_attempt = self.connection_attempts.get(conn.id, 0)
482+
if last_connect_attempt > now - 10:
478483
return
479484
self.connection_attempts[conn.id] = now
480485

@@ -538,6 +543,9 @@ def _on_connection_close(self, conn, **kwargs):
538543
self.io_loop.remove_timeout(conn.rdy_timeout)
539544
conn.rdy_timeout = None
540545

546+
if self._closed:
547+
return
548+
541549
if not self.lookupd_http_addresses:
542550
# automatically reconnect to nsqd addresses when not using lookupd
543551
logger.info('[%s:%s] attempting to reconnect in 15s', conn.id, self.name)

nsq/writer.py

+15
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(self, nsqd_tcp_addresses, reconnect_interval=15.0, name=None, **kwa
9696

9797
self.name = name or nsqd_tcp_addresses[0]
9898
self.nsqd_tcp_addresses = nsqd_tcp_addresses
99+
self._closed = False
99100
self.conns = {}
100101

101102
# Verify keyword arguments
@@ -235,6 +236,9 @@ def _on_connection_close(self, conn, **kwargs):
235236
logger.exception('[%s] uncaught exception in callback', conn.id)
236237

237238
logger.warning('[%s] connection closed', conn.id)
239+
if self._closed:
240+
return
241+
238242
logger.info('[%s] attempting to reconnect in %0.2fs', conn.id, self.reconnect_interval)
239243
reconnect_callback = functools.partial(self.connect_to_nsqd,
240244
host=conn.host, port=conn.port)
@@ -244,3 +248,14 @@ def _finish_pub(self, conn, data, command, topic, msg):
244248
if isinstance(data, protocol.Error):
245249
logger.error('[%s] failed to %s (%s, %s), data is %s',
246250
conn.id if conn else 'NA', command, topic, msg, data)
251+
252+
def close(self):
253+
"""
254+
Closes all connections and stops all periodic callbacks
255+
"""
256+
self._closed = True
257+
258+
for conn in self.conns.values():
259+
conn.close()
260+
261+
super(Writer, self).close()

0 commit comments

Comments
 (0)