diff --git a/aiodocker/docker.py b/aiodocker/docker.py index f9389059..c3cef8b6 100644 --- a/aiodocker/docker.py +++ b/aiodocker/docker.py @@ -466,13 +466,16 @@ def __init__(self, docker): self.docker = docker self.channel = Channel() self.json_stream = None + self.task = None def listen(self): warnings.warn("use subscribe() method instead", DeprecationWarning, stacklevel=2) return self.channel.subscribe() - def subscribe(self): + def subscribe(self, create_task=True): + if create_task: + self.task = asyncio.ensure_future(self.run()) return self.channel.subscribe() def _transform_event(self, data): @@ -500,20 +503,25 @@ async def run(self, **params): self._transform_event, human_bool(params['stream']), ) - async for data in self.json_stream: - await self.channel.publish(data) + try: + async for data in self.json_stream: + await self.channel.publish(data) + finally: + await self.json_stream._close() + self.json_stream = None finally: # signal termination to subscribers await self.channel.publish(None) - try: - await self.json_stream.close() - except: - pass - self.json_stream = None async def stop(self): - if self.json_stream: - await self.json_stream.close() + if self.json_stream is not None: + await self.json_stream._close() + if self.task: + self.task.cancel() + try: + await self.task + except asyncio.CancelledError: + pass class DockerLog: diff --git a/aiodocker/jsonstream.py b/aiodocker/jsonstream.py index 6a442c70..b5c752dc 100644 --- a/aiodocker/jsonstream.py +++ b/aiodocker/jsonstream.py @@ -39,7 +39,7 @@ async def _close(self): # (see https://github.com/KeepSafe/aiohttp/issues/739) # response error , it has been closed - await self._response.close() + self._response.close() async def json_stream_result(response, transform=None, stream=True): diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 00000000..31b2a5eb --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,24 @@ +import asyncio + +import pytest + + +def test_events_default_task(docker): + loop = asyncio.get_event_loop() + docker.events.subscribe() + assert docker.events.task is not None + loop.run_until_complete(docker.events.stop()) + assert docker.events.task.done() + assert docker.events.json_stream is None + + +def test_events_provided_task(docker): + loop = asyncio.get_event_loop() + task = asyncio.ensure_future(docker.events.run()) + docker.events.subscribe(create_task=False) + assert docker.events.task is None + loop.run_until_complete(docker.events.stop()) + assert docker.events.json_stream is None + task.cancel() + with pytest.raises(asyncio.CancelledError): + loop.run_until_complete(task) diff --git a/tests/test_integration.py b/tests/test_integration.py index 5ff91bbe..38540739 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -203,7 +203,6 @@ async def test_port(docker, testing_images, redis_container): @pytest.mark.asyncio async def test_events(docker, testing_images, event_loop): - monitor_task = event_loop.create_task(docker.events.run()) subscriber = docker.events.subscribe() # Do some stuffs to generate events. @@ -231,4 +230,5 @@ async def test_events(docker, testing_images, event_loop): break assert events_occurred == ['create', 'start', 'kill', 'die', 'destroy'] - monitor_task.cancel() + + await docker.events.stop()