diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 0000000..6c6b9e0 --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,35 @@ +build: false + +platform: + - x64 + +environment: + matrix: + - PYTHON_VERSION: 3.6 + MINICONDA_DIRNAME: C:\Miniconda36-x64 + + - PYTHON_VERSION: 3.5 + MINICONDA_DIRNAME: C:\Miniconda35-x64 + + - PYTHON_VERSION: 3.4 + MINICONDA_DIRNAME: C:\Miniconda34-x64 + +matrix: + fast_finish: false + +init: + - ECHO %PYTHON_VERSION% %MINICONDA_DIRNAME% + +install: + - cmd: set "PATH=%MINICONDA_DIRNAME%;%MINICONDA_DIRNAME%\\Scripts;%PATH%" + - cmd: conda config --set always_yes true + - cmd: conda update --quiet conda + - cmd: conda info --all + - cmd: conda create --quiet --name conda-env-%PYTHON_VERSION% python=%PYTHON_VERSION% + - cmd: activate conda-env-%PYTHON_VERSION% + - cmd: pip install --disable-pip-version-check --user --upgrade pip + - cmd: pip install -e . + - cmd: pip install pytest pytest-xdist + +test_script: + - cmd: pytest --color=yes -n 2 -sv diff --git a/docs/source/about.rst b/docs/source/about.rst index c675f3f..438b3a2 100644 --- a/docs/source/about.rst +++ b/docs/source/about.rst @@ -112,7 +112,8 @@ always take this into account: #. osBrain uses `pickle `_ for serialization by default, which means that the system performance may as well depend on this package. Serialization is configurable, though. -#. osBrain default transport is IPC by default, but :ref:`it can be changed +#. osBrain default transport is IPC for operating systems that provide UNIX + domain sockets, and TCP for the rest. :ref:`It can be changed globally or configured specifically for each bind `. Note, however, that when using TCP, the network may have a great impact on performance. diff --git a/docs/source/transport_protocol.rst b/docs/source/transport_protocol.rst index d14cd23..320f391 100644 --- a/docs/source/transport_protocol.rst +++ b/docs/source/transport_protocol.rst @@ -12,7 +12,8 @@ Transport protocol Available transports ==================== -Althought the default transport protocol is IPC, there are other transport +Althought the default transport protocol is IPC for operating systems that +provide UNIX domain sockets and TCP for the rest, there are other transport protocols that can be used in osBrain: - ``tcp``: common `TCP `_. Can always be used, and must be used to communicate agents running in diff --git a/osbrain/__init__.py b/osbrain/__init__.py index 5e08c57..17fe264 100644 --- a/osbrain/__init__.py +++ b/osbrain/__init__.py @@ -14,8 +14,9 @@ def mkdir(directory, **kwargs): import Pyro4 Pyro4.config.SERIALIZERS_ACCEPTED.add('pickle') +Pyro4.config.SERIALIZERS_ACCEPTED.add('cloudpickle') Pyro4.config.SERIALIZERS_ACCEPTED.add('dill') -Pyro4.config.SERIALIZER = 'dill' +Pyro4.config.SERIALIZER = 'cloudpickle' Pyro4.config.THREADPOOL_SIZE = 16 Pyro4.config.SERVERTYPE = 'thread' Pyro4.config.REQUIRE_EXPOSE = False diff --git a/osbrain/agent.py b/osbrain/agent.py index 8026e56..82fa0a3 100644 --- a/osbrain/agent.py +++ b/osbrain/agent.py @@ -298,7 +298,7 @@ def _handle_loopback(self, message): """ Handle incoming messages in the loopback socket. """ - header, data = dill.loads(message) + header, data = cloudpickle.loads(message) if header == 'EXECUTE_METHOD': method, args, kwargs = data try: @@ -316,7 +316,7 @@ def _handle_loopback_safe(self, data): """ Handle incoming messages in the _loopback_safe socket. """ - method, args, kwargs = dill.loads(data) + method, args, kwargs = cloudpickle.loads(data) try: response = getattr(self, method)(*args, **kwargs) except Exception as error: @@ -345,7 +345,7 @@ def _loopback(self, header, data=None): """ if not self._running: raise NotImplementedError() - data = dill.dumps((header, data)) + data = cloudpickle.dumps((header, data)) return self._loopback_reqrep('inproc://loopback', data) def safe_call(self, method, *args, **kwargs): @@ -366,7 +366,7 @@ def safe_call(self, method, *args, **kwargs): if not self._running: raise RuntimeError( 'Agent must be running to safely execute methods!') - data = dill.dumps((method, args, kwargs)) + data = cloudpickle.dumps((method, args, kwargs)) return self._loopback_reqrep('inproc://_loopback_safe', data) def each(self, period, method, *args, alias=None, **kwargs): @@ -1801,7 +1801,7 @@ def __init__(self, name='', nsaddr=None, addr=None, serializer=None, self.nsaddr = nsaddr self._serializer = serializer self._transport = transport - self.base = base + self.base = cloudpickle.dumps(base) self._shutdown_event = multiprocessing.Event() self._queue = multiprocessing.Queue() self._sigint = False @@ -1817,6 +1817,7 @@ def run(self): try: ns = NSProxy(self.nsaddr) self._daemon = Pyro4.Daemon(self._host, self.port) + self.base = cloudpickle.loads(self.base) self.agent = self.base(name=self.name, host=self._host, serializer=self._serializer, transport=self._transport, diff --git a/osbrain/nameserver.py b/osbrain/nameserver.py index f0aee88..d07823b 100644 --- a/osbrain/nameserver.py +++ b/osbrain/nameserver.py @@ -6,6 +6,7 @@ import time import random import multiprocessing +import cloudpickle import Pyro4 from Pyro4.naming import BroadcastServer @@ -78,7 +79,7 @@ class NameServerProcess(multiprocessing.Process): def __init__(self, addr=None, base=NameServer): super().__init__() self._daemon = None - self._base = base + self._base = cloudpickle.dumps(base) if isinstance(addr, int): addr = '127.0.0.1:%s' % addr self.addr = addr @@ -91,8 +92,7 @@ def run(self): """ Begin execution of the name server process and start the main loop. """ - # Capture SIGINT - + self._base = cloudpickle.loads(self._base) try: Pyro4.naming.NameServer = self._base self._daemon = Pyro4.naming.NameServerDaemon(self.host, self.port) diff --git a/osbrain/tests/common.py b/osbrain/tests/common.py index a37bce1..cdf5663 100644 --- a/osbrain/tests/common.py +++ b/osbrain/tests/common.py @@ -1,10 +1,24 @@ +import sys import pytest +from pytest import mark from osbrain import run_agent from osbrain import run_logger from osbrain import run_nameserver from osbrain.helper import sync_agent_logger +skip_windows = mark.skipif(sys.platform == 'win32', + reason='Not supported on windows') +skip_windows_port_reuse = mark.skipif(sys.platform == 'win32', + reason='Windows allows port reuse') +skip_windows_any_port = mark.skipif(sys.platform == 'win32', + reason='Windows allows binding to well ' + 'known ports') +skip_windows_spawn = mark.skipif(sys.platform == 'win32', + reason='Windows does not support fork') +skip_windows_ipc = mark.skipif(sys.platform == 'win32', + reason='Windows does not support IPC') + def append_received(agent, message, topic=None): agent.received.append(message) diff --git a/osbrain/tests/test_agent.py b/osbrain/tests/test_agent.py index da9a267..ad1efe5 100644 --- a/osbrain/tests/test_agent.py +++ b/osbrain/tests/test_agent.py @@ -28,12 +28,16 @@ from common import append_received from common import set_received +from common import skip_windows_spawn +from common import skip_windows_any_port +from common import skip_windows_port_reuse + def test_agent_uuid(): """ All agent identifiers should be unique strings. """ - N = 1000 + N = 100 bunch = set(Agent()._uuid for i in range(N)) assert len(bunch) == N assert all(isinstance(identifier, bytes) for identifier in bunch) @@ -265,6 +269,7 @@ def test_bind_tcp_addr_specific_port(nsproxy): assert address.address.port == port +@skip_windows_spawn @pytest.mark.parametrize('linger, sleep_time, should_receive', [ (2, 1, True), (0.5, 1, False), @@ -301,7 +306,7 @@ def on_init(self): puller.bind('PULL', alias='pull', handler=append_received, addr=address.address, transport='tcp') - assert should_receive == wait_agent_attr(puller, data='foo', timeout=.2) + assert should_receive == wait_agent_attr(puller, data='foo', timeout=1) def test_pushpull(nsproxy): @@ -599,6 +604,7 @@ def test_running_exception(nsproxy): assert not agent.is_running() +@skip_windows_port_reuse def test_agent_error_address_already_in_use(nsproxy): """ Running an agent should raise an error if address is already in use. @@ -609,6 +615,7 @@ def test_agent_error_address_already_in_use(nsproxy): assert 'Address already in use' in str(error.value) +@skip_windows_any_port def test_agent_error_permission_denied(nsproxy): """ Running an agent should raise an error if it has not sufficient diff --git a/osbrain/tests/test_agent_ipc_sockets.py b/osbrain/tests/test_agent_ipc_sockets.py index c8d7736..d028d87 100644 --- a/osbrain/tests/test_agent_ipc_sockets.py +++ b/osbrain/tests/test_agent_ipc_sockets.py @@ -9,6 +9,9 @@ from osbrain.helper import wait_condition from common import nsproxy # pragma: no flakes +from common import skip_windows_ipc + +pytestmark = skip_windows_ipc def test_agent_close_ipc_socket_agent_shutdown(nsproxy): diff --git a/osbrain/tests/test_agent_transport.py b/osbrain/tests/test_agent_transport.py index a2eb249..abb62a6 100644 --- a/osbrain/tests/test_agent_transport.py +++ b/osbrain/tests/test_agent_transport.py @@ -15,27 +15,37 @@ from common import nsproxy # pragma: no flakes from common import append_received +from common import skip_windows_spawn +from common import skip_windows_ipc -def test_agent_bind_transport_global(nsproxy): +def test_agent_bind_transport_platform_default(nsproxy): """ - Test global default transport. + Default transport is platform-dependent. """ - # Default transport agent = run_agent('a0') address = agent.bind('PUSH') - assert address.transport == 'ipc' + if os.name == 'posix': + assert address.transport == 'ipc' + else: + assert address.transport == 'tcp' - # Changing default global transport - osbrain.config['TRANSPORT'] = 'tcp' - agent = run_agent('a1') + +@skip_windows_spawn +def test_agent_bind_transport_global(nsproxy): + """ + Test global default transport change. + """ + # Default transport is not `inproc` + agent = run_agent('a0') address = agent.bind('PUSH') - assert address.transport == 'tcp' + assert address.transport != 'inproc' - osbrain.config['TRANSPORT'] = 'ipc' - agent = run_agent('a2') + # Changing default global transport to `inproc` + osbrain.config['TRANSPORT'] = 'inproc' + agent = run_agent('a1') address = agent.bind('PUSH') - assert address.transport == 'ipc' + assert address.transport == 'inproc' def test_agent_bind_transport_agent(nsproxy): @@ -46,9 +56,9 @@ def test_agent_bind_transport_agent(nsproxy): address = agent.bind('PUSH') assert address.transport == 'tcp' - agent = run_agent('a1', transport='ipc') + agent = run_agent('a1', transport='inproc') address = agent.bind('PUSH') - assert address.transport == 'ipc' + assert address.transport == 'inproc' def test_agent_bind_transport_bind(nsproxy): @@ -64,18 +74,21 @@ def test_agent_bind_transport_bind(nsproxy): assert address.transport == 'inproc' -def test_agent_bind_given_address(nsproxy): +@skip_windows_ipc +def test_agent_bind_given_address_ipc(nsproxy): """ Test agent binding to an specified address using TCP and IPC transport layers. """ agent = run_agent('a0') - # IPC ipc_addr = str(uuid4()) address = agent.bind('PUSH', addr=ipc_addr, transport='ipc') assert address.transport == 'ipc' assert address.address.name == ipc_addr - # TCP + + +def test_agent_bind_given_address_tcp(nsproxy): + agent = run_agent('a0') while True: try: # Bind to random port @@ -90,6 +103,7 @@ def test_agent_bind_given_address(nsproxy): assert address.address == tcp_addr +@skip_windows_ipc def test_agent_ipc_from_different_folders(nsproxy): """ IPC should work well even when agents are run from different folders. diff --git a/osbrain/tests/test_nameserver.py b/osbrain/tests/test_nameserver.py index 67b61bf..bdaad8e 100644 --- a/osbrain/tests/test_nameserver.py +++ b/osbrain/tests/test_nameserver.py @@ -22,6 +22,8 @@ from osbrain.nameserver import random_nameserver_process from common import nsproxy # pragma: no flakes +from common import skip_windows_any_port +from common import skip_windows_port_reuse def test_nameserver_ping(nsproxy): @@ -326,6 +328,7 @@ def test_nameserver_agent_address(nsproxy): assert nsproxy.addr('a1', 'bar') == addr1 +@skip_windows_any_port def test_random_nameserver_process(): """ Basic random_nameserver_process function tests: port range and exceptions. @@ -346,6 +349,7 @@ def test_random_nameserver_process(): random_nameserver_process(port_start=22, port_stop=22, timeout=0.5) +@skip_windows_port_reuse def test_nameserver_oserror(nsproxy): """ Name server start() should raise an error if address is already in use. @@ -356,6 +360,7 @@ def test_nameserver_oserror(nsproxy): assert 'Address already in use' in str(error.value) +@skip_windows_any_port def test_nameserver_permissionerror(): """ Name server start() should raise an error if it has not sufficient diff --git a/osbrain/tests/test_proxy.py b/osbrain/tests/test_proxy.py index 0099018..a41d23d 100644 --- a/osbrain/tests/test_proxy.py +++ b/osbrain/tests/test_proxy.py @@ -57,7 +57,7 @@ def test_wrong_nameserver_address(timeout): t0 = time.time() with pytest.raises(TimeoutError): locate_ns('127.0.0.1:1234', timeout=timeout) - assert timeout <= time.time() - t0 <= timeout + 1. + assert timeout <= time.time() - t0 <= timeout + 1.2 def test_no_timeout_locate_ns_existing(nsproxy): @@ -323,4 +323,4 @@ def shoot(self): assert time.time() - t0 < 0.2 assert not wayne._next_oneway - assert wait_agent_attr(wayne, value=20*['bang!'], timeout=1.2) + assert wait_agent_attr(wayne, value=20*['bang!'], timeout=1.5)