Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
build: false

platform:
- x64

environment:
matrix:
- PYTHON_VERSION: 3.6
MINICONDA_DIRNAME: C:\Miniconda36-x64

- PYTHON_VERSION: 3.5
MINICONDA_DIRNAME: C:\Miniconda35-x64

- PYTHON_VERSION: 3.4
MINICONDA_DIRNAME: C:\Miniconda34-x64

matrix:
fast_finish: false

init:
- ECHO %PYTHON_VERSION% %MINICONDA_DIRNAME%

install:
- cmd: set "PATH=%MINICONDA_DIRNAME%;%MINICONDA_DIRNAME%\\Scripts;%PATH%"
- cmd: conda config --set always_yes true
- cmd: conda update --quiet conda
- cmd: conda info --all
- cmd: conda create --quiet --name conda-env-%PYTHON_VERSION% python=%PYTHON_VERSION%
- cmd: activate conda-env-%PYTHON_VERSION%
- cmd: pip install --disable-pip-version-check --user --upgrade pip
- cmd: pip install -e .
- cmd: pip install pytest pytest-xdist

test_script:
- cmd: pytest --color=yes -n 2 -sv
3 changes: 2 additions & 1 deletion docs/source/about.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ always take this into account:
#. osBrain uses `pickle <https://docs.python.org/library/pickle.html>`_ for
serialization by default, which means that the system performance may as
well depend on this package. Serialization is configurable, though.
#. osBrain default transport is IPC by default, but :ref:`it can be changed
#. osBrain default transport is IPC for operating systems that provide UNIX
domain sockets, and TCP for the rest. :ref:`It can be changed
globally or configured specifically for each bind <transport_protocol>`.
Note, however, that when using TCP, the network may have a great impact
on performance.
3 changes: 2 additions & 1 deletion docs/source/transport_protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ Transport protocol
Available transports
====================

Althought the default transport protocol is IPC, there are other transport
Althought the default transport protocol is IPC for operating systems that
provide UNIX domain sockets and TCP for the rest, there are other transport
protocols that can be used in osBrain:

- ``tcp``: common `TCP <https://en.wikipedia.org/wiki/Transmission_Control_Protocol>`_. Can always be used, and must be used to communicate agents running in
Expand Down
3 changes: 2 additions & 1 deletion osbrain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ def mkdir(directory, **kwargs):

import Pyro4
Pyro4.config.SERIALIZERS_ACCEPTED.add('pickle')
Pyro4.config.SERIALIZERS_ACCEPTED.add('cloudpickle')
Pyro4.config.SERIALIZERS_ACCEPTED.add('dill')
Pyro4.config.SERIALIZER = 'dill'
Pyro4.config.SERIALIZER = 'cloudpickle'
Pyro4.config.THREADPOOL_SIZE = 16
Pyro4.config.SERVERTYPE = 'thread'
Pyro4.config.REQUIRE_EXPOSE = False
Expand Down
11 changes: 6 additions & 5 deletions osbrain/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _handle_loopback(self, message):
"""
Handle incoming messages in the loopback socket.
"""
header, data = dill.loads(message)
header, data = cloudpickle.loads(message)
if header == 'EXECUTE_METHOD':
method, args, kwargs = data
try:
Expand All @@ -316,7 +316,7 @@ def _handle_loopback_safe(self, data):
"""
Handle incoming messages in the _loopback_safe socket.
"""
method, args, kwargs = dill.loads(data)
method, args, kwargs = cloudpickle.loads(data)
try:
response = getattr(self, method)(*args, **kwargs)
except Exception as error:
Expand Down Expand Up @@ -345,7 +345,7 @@ def _loopback(self, header, data=None):
"""
if not self._running:
raise NotImplementedError()
data = dill.dumps((header, data))
data = cloudpickle.dumps((header, data))
return self._loopback_reqrep('inproc://loopback', data)

def safe_call(self, method, *args, **kwargs):
Expand All @@ -366,7 +366,7 @@ def safe_call(self, method, *args, **kwargs):
if not self._running:
raise RuntimeError(
'Agent must be running to safely execute methods!')
data = dill.dumps((method, args, kwargs))
data = cloudpickle.dumps((method, args, kwargs))
return self._loopback_reqrep('inproc://_loopback_safe', data)

def each(self, period, method, *args, alias=None, **kwargs):
Expand Down Expand Up @@ -1801,7 +1801,7 @@ def __init__(self, name='', nsaddr=None, addr=None, serializer=None,
self.nsaddr = nsaddr
self._serializer = serializer
self._transport = transport
self.base = base
self.base = cloudpickle.dumps(base)
self._shutdown_event = multiprocessing.Event()
self._queue = multiprocessing.Queue()
self._sigint = False
Expand All @@ -1817,6 +1817,7 @@ def run(self):
try:
ns = NSProxy(self.nsaddr)
self._daemon = Pyro4.Daemon(self._host, self.port)
self.base = cloudpickle.loads(self.base)
self.agent = self.base(name=self.name, host=self._host,
serializer=self._serializer,
transport=self._transport,
Expand Down
6 changes: 3 additions & 3 deletions osbrain/nameserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import random
import multiprocessing
import cloudpickle

import Pyro4
from Pyro4.naming import BroadcastServer
Expand Down Expand Up @@ -78,7 +79,7 @@ class NameServerProcess(multiprocessing.Process):
def __init__(self, addr=None, base=NameServer):
super().__init__()
self._daemon = None
self._base = base
self._base = cloudpickle.dumps(base)
if isinstance(addr, int):
addr = '127.0.0.1:%s' % addr
self.addr = addr
Expand All @@ -91,8 +92,7 @@ def run(self):
"""
Begin execution of the name server process and start the main loop.
"""
# Capture SIGINT

self._base = cloudpickle.loads(self._base)
try:
Pyro4.naming.NameServer = self._base
self._daemon = Pyro4.naming.NameServerDaemon(self.host, self.port)
Expand Down
14 changes: 14 additions & 0 deletions osbrain/tests/common.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
import sys
import pytest
from pytest import mark

from osbrain import run_agent
from osbrain import run_logger
from osbrain import run_nameserver
from osbrain.helper import sync_agent_logger

skip_windows = mark.skipif(sys.platform == 'win32',
reason='Not supported on windows')
skip_windows_port_reuse = mark.skipif(sys.platform == 'win32',
reason='Windows allows port reuse')
skip_windows_any_port = mark.skipif(sys.platform == 'win32',
reason='Windows allows binding to well '
'known ports')
skip_windows_spawn = mark.skipif(sys.platform == 'win32',
reason='Windows does not support fork')
skip_windows_ipc = mark.skipif(sys.platform == 'win32',
reason='Windows does not support IPC')


def append_received(agent, message, topic=None):
agent.received.append(message)
Expand Down
11 changes: 9 additions & 2 deletions osbrain/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
from common import append_received
from common import set_received

from common import skip_windows_spawn
from common import skip_windows_any_port
from common import skip_windows_port_reuse


def test_agent_uuid():
"""
All agent identifiers should be unique strings.
"""
N = 1000
N = 100
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Is it too slow on Windows?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I don't understand why, but for some reason, it takes 12-13 seconds to complete on Windows and about 0.2 seconds on Linux. I thought N=100 would be enough for this test anyway

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew Windows processes were slow, but did not know they were that slow. The main difference is the way they are created (spawn vs. fork). Forking is definitely much faster, specially with the copy-on-write implemented in Linux.

Leave it with 100 then. 👍

bunch = set(Agent()._uuid for i in range(N))
assert len(bunch) == N
assert all(isinstance(identifier, bytes) for identifier in bunch)
Expand Down Expand Up @@ -265,6 +269,7 @@ def test_bind_tcp_addr_specific_port(nsproxy):
assert address.address.port == port


@skip_windows_spawn
@pytest.mark.parametrize('linger, sleep_time, should_receive', [
(2, 1, True),
(0.5, 1, False),
Expand Down Expand Up @@ -301,7 +306,7 @@ def on_init(self):
puller.bind('PULL', alias='pull', handler=append_received,
addr=address.address, transport='tcp')

assert should_receive == wait_agent_attr(puller, data='foo', timeout=.2)
assert should_receive == wait_agent_attr(puller, data='foo', timeout=1)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this timeout does not need to be changed for now as this test is skipped for Windows. When/if we remove the skip we will change the timeout if necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this one does fail on Linux from time to time. I ran it in a loop yesterday, and it took me anywhere between 50 and 500 attempts to make it fail, but eventually it does.

It has failed a few times in travis:
https://travis-ci.org/ocaballeror/osbrain/jobs/346269407

I think 1 is a safer value.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocaballeror Then ok. It seems changing the timeout does not actually change the test, will only make it a bit slower for the cases where should_receive == False, so it is fine for me. 😊

Should we put it as a separate commit? i.e.: "More lenient timeout in linger test". Simply because it is not really related to Windows tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍

Rebasing this is going to be fun....



def test_pushpull(nsproxy):
Expand Down Expand Up @@ -599,6 +604,7 @@ def test_running_exception(nsproxy):
assert not agent.is_running()


@skip_windows_port_reuse
def test_agent_error_address_already_in_use(nsproxy):
"""
Running an agent should raise an error if address is already in use.
Expand All @@ -609,6 +615,7 @@ def test_agent_error_address_already_in_use(nsproxy):
assert 'Address already in use' in str(error.value)


@skip_windows_any_port
def test_agent_error_permission_denied(nsproxy):
"""
Running an agent should raise an error if it has not sufficient
Expand Down
3 changes: 3 additions & 0 deletions osbrain/tests/test_agent_ipc_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from osbrain.helper import wait_condition

from common import nsproxy # pragma: no flakes
from common import skip_windows_ipc

pytestmark = skip_windows_ipc


def test_agent_close_ipc_socket_agent_shutdown(nsproxy):
Expand Down
46 changes: 30 additions & 16 deletions osbrain/tests/test_agent_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,37 @@

from common import nsproxy # pragma: no flakes
from common import append_received
from common import skip_windows_spawn
from common import skip_windows_ipc


def test_agent_bind_transport_global(nsproxy):
def test_agent_bind_transport_platform_default(nsproxy):
"""
Test global default transport.
Default transport is platform-dependent.
"""
# Default transport
agent = run_agent('a0')
address = agent.bind('PUSH')
assert address.transport == 'ipc'
if os.name == 'posix':
assert address.transport == 'ipc'
else:
assert address.transport == 'tcp'

# Changing default global transport
osbrain.config['TRANSPORT'] = 'tcp'
agent = run_agent('a1')

@skip_windows_spawn
def test_agent_bind_transport_global(nsproxy):
"""
Test global default transport change.
"""
# Default transport is not `inproc`
agent = run_agent('a0')
address = agent.bind('PUSH')
assert address.transport == 'tcp'
assert address.transport != 'inproc'

osbrain.config['TRANSPORT'] = 'ipc'
agent = run_agent('a2')
# Changing default global transport to `inproc`
osbrain.config['TRANSPORT'] = 'inproc'
agent = run_agent('a1')
address = agent.bind('PUSH')
assert address.transport == 'ipc'
assert address.transport == 'inproc'


def test_agent_bind_transport_agent(nsproxy):
Expand All @@ -46,9 +56,9 @@ def test_agent_bind_transport_agent(nsproxy):
address = agent.bind('PUSH')
assert address.transport == 'tcp'

agent = run_agent('a1', transport='ipc')
agent = run_agent('a1', transport='inproc')
address = agent.bind('PUSH')
assert address.transport == 'ipc'
assert address.transport == 'inproc'


def test_agent_bind_transport_bind(nsproxy):
Expand All @@ -64,18 +74,21 @@ def test_agent_bind_transport_bind(nsproxy):
assert address.transport == 'inproc'


def test_agent_bind_given_address(nsproxy):
@skip_windows_ipc
def test_agent_bind_given_address_ipc(nsproxy):
"""
Test agent binding to an specified address using TCP and IPC transport
layers.
"""
agent = run_agent('a0')
# IPC
ipc_addr = str(uuid4())
address = agent.bind('PUSH', addr=ipc_addr, transport='ipc')
assert address.transport == 'ipc'
assert address.address.name == ipc_addr
# TCP


def test_agent_bind_given_address_tcp(nsproxy):
agent = run_agent('a0')
while True:
try:
# Bind to random port
Expand All @@ -90,6 +103,7 @@ def test_agent_bind_given_address(nsproxy):
assert address.address == tcp_addr


@skip_windows_ipc
def test_agent_ipc_from_different_folders(nsproxy):
"""
IPC should work well even when agents are run from different folders.
Expand Down
5 changes: 5 additions & 0 deletions osbrain/tests/test_nameserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from osbrain.nameserver import random_nameserver_process

from common import nsproxy # pragma: no flakes
from common import skip_windows_any_port
from common import skip_windows_port_reuse


def test_nameserver_ping(nsproxy):
Expand Down Expand Up @@ -326,6 +328,7 @@ def test_nameserver_agent_address(nsproxy):
assert nsproxy.addr('a1', 'bar') == addr1


@skip_windows_any_port
def test_random_nameserver_process():
"""
Basic random_nameserver_process function tests: port range and exceptions.
Expand All @@ -346,6 +349,7 @@ def test_random_nameserver_process():
random_nameserver_process(port_start=22, port_stop=22, timeout=0.5)


@skip_windows_port_reuse
def test_nameserver_oserror(nsproxy):
"""
Name server start() should raise an error if address is already in use.
Expand All @@ -356,6 +360,7 @@ def test_nameserver_oserror(nsproxy):
assert 'Address already in use' in str(error.value)


@skip_windows_any_port
def test_nameserver_permissionerror():
"""
Name server start() should raise an error if it has not sufficient
Expand Down
4 changes: 2 additions & 2 deletions osbrain/tests/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_wrong_nameserver_address(timeout):
t0 = time.time()
with pytest.raises(TimeoutError):
locate_ns('127.0.0.1:1234', timeout=timeout)
assert timeout <= time.time() - t0 <= timeout + 1.
assert timeout <= time.time() - t0 <= timeout + 1.2


def test_no_timeout_locate_ns_existing(nsproxy):
Expand Down Expand Up @@ -323,4 +323,4 @@ def shoot(self):
assert time.time() - t0 < 0.2
assert not wayne._next_oneway

assert wait_agent_attr(wayne, value=20*['bang!'], timeout=1.2)
assert wait_agent_attr(wayne, value=20*['bang!'], timeout=1.5)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this failing with the previous timeout?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe at some point, I wouldn't have changed it otherwise. I'll run it through appveyor again, maybe it needed longer timeouts before, since we were using 8 threads instead of 2.

Copy link
Copy Markdown
Member

@Peque Peque Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still fails if set back to 1.2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, I ran it in a loop to see if it would eventually fail, and after 100-200 tests I've gotten it to fail a couple of times. 1.5 looks like a safer value.