Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 19 additions & 31 deletions homeassistant/components/camera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,32 +301,23 @@ async def write_to_mjpeg_stream(img_bytes):

last_image = None

try:
while True:
img_bytes = await self.async_camera_image()
if not img_bytes:
break

if img_bytes and img_bytes != last_image:
await write_to_mjpeg_stream(img_bytes)

# Chrome seems to always ignore first picture,
# print it twice.
if last_image is None:
await write_to_mjpeg_stream(img_bytes)
while True:
img_bytes = await self.async_camera_image()
if not img_bytes:
break

last_image = img_bytes
if img_bytes and img_bytes != last_image:
await write_to_mjpeg_stream(img_bytes)

await asyncio.sleep(interval)
# Chrome seems to always ignore first picture,
# print it twice.
if last_image is None:
await write_to_mjpeg_stream(img_bytes)
last_image = img_bytes

except asyncio.CancelledError:
_LOGGER.debug("Stream closed by frontend.")
response = None
raise
await asyncio.sleep(interval)

finally:
if response is not None:
await response.write_eof()
return response

async def handle_async_mjpeg_stream(self, request):
"""Serve an HTTP MJPEG stream from the camera.
Expand Down Expand Up @@ -409,10 +400,9 @@ async def get(self, request, entity_id):
request.query.get('token') in camera.access_tokens)

if not authenticated:
return web.Response(status=401)
raise web.HTTPUnauthorized()

response = await self.handle(request, camera)
return response
return await self.handle(request, camera)

async def handle(self, request, camera):
"""Handle the camera request."""
Expand All @@ -435,7 +425,7 @@ async def handle(self, request, camera):
return web.Response(body=image,
content_type=camera.content_type)

return web.Response(status=500)
raise web.HTTPInternalServerError()


class CameraMjpegStream(CameraView):
Expand All @@ -448,19 +438,17 @@ async def handle(self, request, camera):
"""Serve camera stream, possibly with interval."""
interval = request.query.get('interval')
if interval is None:
await camera.handle_async_mjpeg_stream(request)
return
return await camera.handle_async_mjpeg_stream(request)

try:
# Compose camera stream from stills
interval = float(request.query.get('interval'))
if interval < MIN_STREAM_INTERVAL:
raise ValueError("Stream interval must be be > {}"
.format(MIN_STREAM_INTERVAL))
await camera.handle_async_still_stream(request, interval)
return
return await camera.handle_async_still_stream(request, interval)
except ValueError:
return web.Response(status=400)
raise web.HTTPBadRequest()


@callback
Expand Down
24 changes: 12 additions & 12 deletions homeassistant/components/camera/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
})


@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up a FFmpeg camera."""
if not hass.data[DATA_FFMPEG].async_run_test(config.get(CONF_INPUT)):
return
Expand All @@ -49,30 +49,30 @@ def __init__(self, hass, config):
self._input = config.get(CONF_INPUT)
self._extra_arguments = config.get(CONF_EXTRA_ARGUMENTS)

@asyncio.coroutine
def async_camera_image(self):
async def async_camera_image(self):
"""Return a still image response from the camera."""
from haffmpeg import ImageFrame, IMAGE_JPEG
ffmpeg = ImageFrame(self._manager.binary, loop=self.hass.loop)

image = yield from asyncio.shield(ffmpeg.get_image(
image = await asyncio.shield(ffmpeg.get_image(
self._input, output_format=IMAGE_JPEG,
extra_cmd=self._extra_arguments), loop=self.hass.loop)
return image

@asyncio.coroutine
def handle_async_mjpeg_stream(self, request):
async def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera."""
from haffmpeg import CameraMjpeg

stream = CameraMjpeg(self._manager.binary, loop=self.hass.loop)
yield from stream.open_camera(
await stream.open_camera(
self._input, extra_cmd=self._extra_arguments)

yield from async_aiohttp_proxy_stream(
self.hass, request, stream,
'multipart/x-mixed-replace;boundary=ffserver')
yield from stream.close()
try:
return await async_aiohttp_proxy_stream(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Await inside coroutine.

This comment was marked as resolved.

self.hass, request, stream,
'multipart/x-mixed-replace;boundary=ffserver')
finally:
await stream.close()

@property
def name(self):
Expand Down
7 changes: 3 additions & 4 deletions homeassistant/components/camera/mjpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,18 @@ def camera_image(self):
with closing(req) as response:
return extract_image_from_mjpeg(response.iter_content(102400))

@asyncio.coroutine
def handle_async_mjpeg_stream(self, request):
async def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera."""
# aiohttp don't support DigestAuth -> Fallback
if self._authentication == HTTP_DIGEST_AUTHENTICATION:
yield from super().handle_async_mjpeg_stream(request)
await super().handle_async_mjpeg_stream(request)
return

# connect to stream
websession = async_get_clientsession(self.hass)
stream_coro = websession.get(self._mjpeg_url, auth=self._auth)

yield from async_aiohttp_proxy_web(self.hass, request, stream_coro)
return await async_aiohttp_proxy_web(self.hass, request, stream_coro)

@property
def name(self):
Expand Down
13 changes: 4 additions & 9 deletions homeassistant/components/camera/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ async def handle_async_mjpeg_stream(self, request):
stream_coro = websession.get(url, headers=self._headers)

if not self._stream_opts:
await async_aiohttp_proxy_web(self.hass, request, stream_coro)
return
return await async_aiohttp_proxy_web(
self.hass, request, stream_coro)

response = aiohttp.web.StreamResponse()
response.content_type = ('multipart/x-mixed-replace; '
Expand Down Expand Up @@ -229,15 +229,10 @@ async def write(img_bytes):
_resize_image, image, self._stream_opts)
await write(image)
data = data[jpg_end + 2:]
except asyncio.CancelledError:
_LOGGER.debug("Stream closed by frontend.")
finally:
req.close()
response = None
raise

finally:
if response is not None:
await response.write_eof()
return response

@property
def name(self):
Expand Down
19 changes: 7 additions & 12 deletions homeassistant/helpers/aiohttp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ def async_create_clientsession(hass, verify_ssl=True, auto_cleanup=True,
return clientsession


@asyncio.coroutine
@bind_hass
def async_aiohttp_proxy_web(hass, request, web_coro, buffer_size=102400,
timeout=10):
async def async_aiohttp_proxy_web(hass, request, web_coro,
buffer_size=102400, timeout=10):
"""Stream websession request to aiohttp web response."""
try:
with async_timeout.timeout(timeout, loop=hass.loop):
req = yield from web_coro
req = await web_coro

except asyncio.CancelledError:
# The user cancelled the request
Expand All @@ -88,7 +87,7 @@ def async_aiohttp_proxy_web(hass, request, web_coro, buffer_size=102400,
raise HTTPBadGateway() from err

try:
yield from async_aiohttp_proxy_stream(
return await async_aiohttp_proxy_stream(
hass,
request,
req.content,
Expand All @@ -112,19 +111,15 @@ async def async_aiohttp_proxy_stream(hass, request, stream, content_type,
data = await stream.read(buffer_size)

if not data:
await response.write_eof()
break

await response.write(data)

except (asyncio.TimeoutError, aiohttp.ClientError):
# Something went wrong fetching data, close connection gracefully
await response.write_eof()

except asyncio.CancelledError:
# The user closed the connection
# Something went wrong fetching data, closed connection
pass

return response


@callback
def _async_register_clientsession_shutdown(hass, clientsession):
Expand Down