Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions aiodocker/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,16 @@ def __init__(self, docker):
self.docker = docker
self.channel = Channel()
self.json_stream = None
self.task = None

def listen(self):
warnings.warn("use subscribe() method instead",
DeprecationWarning, stacklevel=2)
return self.channel.subscribe()

def subscribe(self):
def subscribe(self, create_task=True):
if create_task:
self.task = asyncio.ensure_future(self.run())
return self.channel.subscribe()

def _transform_event(self, data):
Expand Down Expand Up @@ -500,20 +503,25 @@ async def run(self, **params):
self._transform_event,
human_bool(params['stream']),
)
async for data in self.json_stream:
await self.channel.publish(data)
try:
async for data in self.json_stream:
await self.channel.publish(data)
finally:
await self.json_stream._close()
self.json_stream = None
finally:
# signal termination to subscribers
await self.channel.publish(None)
try:
await self.json_stream.close()
except:
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@barrachri the error in the close() used to be hidden by this try...except: pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it seems harmless, I'll check this .close() in the future PRs.

self.json_stream = None

async def stop(self):
if self.json_stream:
await self.json_stream.close()
if self.json_stream is not None:
await self.json_stream._close()
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass


class DockerLog:
Expand Down
2 changes: 1 addition & 1 deletion aiodocker/jsonstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def _close(self):
# (see https://github.com/KeepSafe/aiohttp/issues/739)

# response error , it has been closed
await self._response.close()
self._response.close()
Copy link

@barrachri barrachri Jul 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you forget await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I actually fixed it. This is no coroutine apparently and the try...except...pass on the top level was hiding the exception. Commit message: 1d3abd8



async def json_stream_result(response, transform=None, stream=True):
Expand Down
24 changes: 24 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

import pytest


def test_events_default_task(docker):
loop = asyncio.get_event_loop()
docker.events.subscribe()
assert docker.events.task is not None
loop.run_until_complete(docker.events.stop())
assert docker.events.task.done()
assert docker.events.json_stream is None


def test_events_provided_task(docker):
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(docker.events.run())
docker.events.subscribe(create_task=False)
assert docker.events.task is None
loop.run_until_complete(docker.events.stop())
assert docker.events.json_stream is None
task.cancel()
with pytest.raises(asyncio.CancelledError):
loop.run_until_complete(task)
4 changes: 2 additions & 2 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ async def test_port(docker, testing_images, redis_container):

@pytest.mark.asyncio
async def test_events(docker, testing_images, event_loop):
monitor_task = event_loop.create_task(docker.events.run())
subscriber = docker.events.subscribe()

# Do some stuffs to generate events.
Expand Down Expand Up @@ -231,4 +230,5 @@ async def test_events(docker, testing_images, event_loop):
break

assert events_occurred == ['create', 'start', 'kill', 'die', 'destroy']
monitor_task.cancel()

await docker.events.stop()