Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tornado 5.x support #232

Merged
merged 4 commits into from
Apr 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
dist: xenial
language: python

matrix:
include:
- python: "2.7"
env: TORNADO_VERSION=4.4.3 NSQ_DOWNLOAD=nsq-0.3.8.linux-amd64.go1.6.2
- python: "2.7"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-0.3.8.linux-amd64.go1.6.2
env: TORNADO_VERSION=4.1 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "2.7"
env: TORNADO_VERSION=4.4.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "2.7"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "2.7"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3
- python: "2.7"
env: TORNADO_VERSION=5.1.1 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3

- python: "3.4"
env: TORNADO_VERSION=4.4.3 NSQ_DOWNLOAD=nsq-0.3.8.linux-amd64.go1.6.2
- python: "3.4"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "3.4"
env: TORNADO_VERSION=5.1.1 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3

- python: "3.5"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-0.3.8.linux-amd64.go1.6.2
- python: "3.5"
env: TORNADO_VERSION=4.4.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "3.5"
env: TORNADO_VERSION=4.4.3 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3
env: TORNADO_VERSION=5.1.1 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3

- python: "3.6"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-0.3.8.linux-amd64.go1.6.2
- python: "3.6"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "3.6"
env: TORNADO_VERSION=5.1.1 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3

- python: "3.7"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.0.0-compat.linux-amd64.go1.8
- python: "3.7"
env: TORNADO_VERSION=4.5.3 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3
env: TORNADO_VERSION=5.1.1 NSQ_DOWNLOAD=nsq-1.1.0.linux-amd64.go1.10.3

script:
- ./travis_test.sh
Expand Down
11 changes: 3 additions & 8 deletions nsq/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@


class Client(object):
def __init__(self, io_loop=None, **kwargs):
self.io_loop = io_loop
if not self.io_loop:
self.io_loop = tornado.ioloop.IOLoop.instance()

tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps,
60 * 1000,
io_loop=self.io_loop).start()
def __init__(self, **kwargs):
self.io_loop = tornado.ioloop.IOLoop.current()
tornado.ioloop.PeriodicCallback(self._check_last_recv_timestamps, 60 * 1000).start()

def _on_connection_identify(self, conn, data, **kwargs):
logger.info('[%s:%s] IDENTIFY sent %r' % (conn.id, self.name, data))
Expand Down
52 changes: 21 additions & 31 deletions nsq/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@
import tornado.iostream
import tornado.ioloop

try:
from tornado.simple_httpclient import _default_ca_certs as default_ca_certs
except ImportError:
# Tornado < 4
from tornado.simple_httpclient import _DEFAULT_CA_CERTS

def default_ca_certs():
return _DEFAULT_CA_CERTS

from nsq import event, protocol
from .deflate_socket import DeflateSocket, DeflateEncoder

Expand Down Expand Up @@ -146,7 +137,6 @@ def __init__(
output_buffer_size=16 * 1024,
output_buffer_timeout=250,
sample_rate=0,
io_loop=None,
auth_secret=None,
msg_timeout=None,
hostname=None):
Expand Down Expand Up @@ -187,9 +177,6 @@ def __init__(
self.heartbeat_interval = heartbeat_interval * 1000
self.msg_timeout = int(msg_timeout * 1000) if msg_timeout else None
self.requeue_delay = requeue_delay
self.io_loop = io_loop
if not self.io_loop:
self.io_loop = tornado.ioloop.IOLoop.instance()

self.output_buffer_size = output_buffer_size
self.output_buffer_timeout = output_buffer_timeout
Expand Down Expand Up @@ -235,7 +222,7 @@ def connect(self):
self.socket.settimeout(self.timeout)
self.socket.setblocking(0)

self.stream = tornado.iostream.IOStream(self.socket, io_loop=self.io_loop)
self.stream = tornado.iostream.IOStream(self.socket)
self.stream.set_close_callback(self._socket_close)
self.stream.set_nodelay(True)

Expand Down Expand Up @@ -263,6 +250,8 @@ def _read_bytes(self, size, callback):
)

def _start_read(self):
if self.stream is None:
return # IOStream.start_tls() invalidates stream, will call again when ready
self._read_bytes(4, self._read_size)

def _socket_close(self):
Expand Down Expand Up @@ -297,28 +286,29 @@ def send(self, data):

def upgrade_to_tls(self, options=None):
# in order to upgrade to TLS we need to *replace* the IOStream...
#
# first remove the event handler for the currently open socket
# so that when we add the socket to the new SSLIOStream below,
# it can re-add the appropriate event handlers. Default to TLSv1.2
# unless ssl_version is set otherwise.
self.io_loop.remove_handler(self.socket.fileno())

opts = {
'cert_reqs': ssl.CERT_REQUIRED,
'ca_certs': default_ca_certs(),
'ssl_version': ssl.PROTOCOL_TLSv1_2
}
opts.update(options or {})
self.socket = ssl.wrap_socket(self.socket,
do_handshake_on_connect=False, **opts)

self.stream = tornado.iostream.SSLIOStream(self.socket, io_loop=self.io_loop)
self.stream.set_close_callback(self._socket_close)
fut = self.stream.start_tls(False, ssl_options=opts, server_hostname=self.host)
Copy link
Member Author

@ploxiln ploxiln Mar 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we were checking if the TLS server certificate was valid for the hostname connected to. Now, if not ssl.CERT_NONE, this will (I think) check hostname ...

self.stream = None

def finish_upgrade_tls(fut):
try:
self.stream = fut.result()
self.socket = self.stream.socket
self._start_read()
except Exception as e:
# skip self.close() because no stream
self.trigger(
event.ERROR,
conn=self,
error=protocol.SendError('failed to upgrade to TLS', e),
)

# now that the IOStream has been swapped we can kickstart
# the SSL handshake
self.stream._do_ssl_handshake()
tornado.ioloop.IOLoop.current().add_future(fut, finish_upgrade_tls)

def upgrade_to_snappy(self):
assert SnappySocket, 'snappy requires the python-snappy package'
Expand All @@ -341,7 +331,7 @@ def upgrade_to_deflate(self):
# is currently in place (normal or SSL)...
#
# first read any compressed bytes the existing IOStream might have
# already buffered and use that to bootstrap the DefalteSocket, then
# already buffered and use that to bootstrap the DeflateSocket, then
# monkey patch the existing IOStream by replacing its socket
# with a wrapper that will automagically handle compression.
existing_data = self.stream._consume(self.stream._read_buffer_size)
Expand Down Expand Up @@ -449,7 +439,7 @@ def _on_response_continue(self, data, **kwargs):
self.upgrade_to_snappy()
elif feature == 'deflate':
self.upgrade_to_deflate()
# the server will 'OK' after these conneciton upgrades triggering another response
# the server will 'OK' after these connection upgrades triggering another response
return

self.off(event.RESPONSE, self._on_response_continue)
Expand Down
11 changes: 11 additions & 0 deletions nsq/deflate_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ def recv(self, size):
def read(self, size):
return self._recv(size, self._socket.read)

def recv_into(self, buf, nbytes=0):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this method necessary?

Copy link
Member Author

@ploxiln ploxiln Apr 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the socket method which tornado-5.x uses instead of recv(), for potential efficiency improvement: tornadoweb/tornado@1215cd2

# no real support of efficient recv_into()
n = nbytes or len(buf)
data = self.recv(n)
r = len(data)
if r > n:
self._bootstrapped = data[n:]
r = n
buf[:r] = data[:r]
return r

def _recv(self, size, method):
if self._bootstrapped:
data = self._bootstrapped
Expand Down
4 changes: 1 addition & 3 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def __init__(

self.conns = {}
self.connection_attempts = {}
self.http_client = tornado.httpclient.AsyncHTTPClient(io_loop=self.io_loop)
self.http_client = tornado.httpclient.AsyncHTTPClient()

# will execute when run() is called (for all Reader instances)
self.io_loop.add_callback(self._run)
Expand All @@ -237,7 +237,6 @@ def _run(self):
self.redist_periodic = PeriodicCallback(
self._redistribute_rdy_state,
5 * 1000,
io_loop=self.io_loop,
)
self.redist_periodic.start()

Expand All @@ -249,7 +248,6 @@ def _run(self):
self.query_periodic = PeriodicCallback(
self.query_lookupd,
self.lookupd_poll_interval * 1000,
io_loop=self.io_loop,
)

# randomize the time we start this poll loop so that all
Expand Down
11 changes: 11 additions & 0 deletions nsq/snappy_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ def recv(self, size):
def read(self, size):
return self._recv(size, self._socket.read)

def recv_into(self, buf, nbytes=0):
# no real support of efficient recv_into()
n = nbytes or len(buf)
data = self.recv(n)
r = len(data)
if r > n:
self._bootstrapped = data[n:]
r = n
buf[:r] = data[:r]
return r

def _recv(self, size, method):
if self._bootstrapped:
data = self._bootstrapped
Expand Down
2 changes: 1 addition & 1 deletion nsq/version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# also update in setup.py
__version__ = '0.8.3'
__version__ = '0.9.0-beta1'
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def run_tests(self):


# also update in nsq/version.py
version = '0.8.3'
version = '0.9.0b1'


setup(
Expand All @@ -32,10 +32,10 @@ def run_tests(self):
version
),
packages=['nsq'],
install_requires=['tornado<5.0'],
install_requires=['tornado<6'],
include_package_data=True,
zip_safe=False,
tests_require=['pytest>=3.3.1', 'mock', 'python-snappy', 'certifi'],
tests_require=['pytest>=3.3.1', 'mock', 'python-snappy'],
cmdclass={'test': PyTest},
classifiers=[
'Development Status :: 6 - Mature',
Expand Down
8 changes: 4 additions & 4 deletions tests/reader_unit_test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import absolute_import
from __future__ import with_statement

from mock import patch, create_autospec
from tornado.ioloop import IOLoop
Expand All @@ -10,12 +9,13 @@
_conn_port = 4150


def get_reader(io_loop=None, max_in_flight=5):
def get_reader(max_in_flight=5):
return nsq.Reader("test", "test",
message_handler=_message_handler,
lookupd_http_addresses=["http://test.local:4161"],
lookupd_http_addresses=["http://localhost:4161"],
max_in_flight=max_in_flight,
io_loop=io_loop)
max_backoff_duration=2.0,
)


def get_ioloop():
Expand Down
12 changes: 4 additions & 8 deletions tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from __future__ import absolute_import

from __future__ import with_statement
import os
import sys

from mock import patch, create_autospec, MagicMock
from mock import patch, create_autospec
from tornado.iostream import IOStream

# shunt '..' into sys.path since we are in a 'tests' subdirectory
Expand All @@ -22,8 +20,8 @@ def f(*args, **kwargs):
pass


def _get_test_conn(io_loop=None):
conn = AsyncConn('test', 4150, io_loop=io_loop)
def _get_test_conn():
conn = AsyncConn('test', 4150)
# now set the stream attribute, which is ordinarily set in conn.connect()
conn.stream = create_autospec(IOStream)
return conn
Expand Down Expand Up @@ -94,9 +92,7 @@ def test_read_size():


def test_read_body():
mock_io_loop = MagicMock()

conn = _get_test_conn(io_loop=mock_io_loop)
conn = _get_test_conn()
on_data = create_autospec(f)
conn.on('data', on_data)

Expand Down
Loading