Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions homeassistant/components/hassio/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from homeassistant.auth.models import User
from homeassistant.auth.providers import homeassistant as auth_ha
from homeassistant.components.http import KEY_HASS, KEY_HASS_USER, HomeAssistantView
from homeassistant.components.http.const import is_supervisor_unix_socket_request
from homeassistant.components.http.data_validator import RequestDataValidator
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
Expand Down Expand Up @@ -41,14 +42,18 @@ def __init__(self, hass: HomeAssistant, user: User) -> None:

def _check_access(self, request: web.Request) -> None:
"""Check if this call is from Supervisor."""
# Check caller IP
hassio_ip = os.environ["SUPERVISOR"].split(":")[0]
assert request.transport
if ip_address(request.transport.get_extra_info("peername")[0]) != ip_address(
hassio_ip
):
_LOGGER.error("Invalid auth request from %s", request.remote)
raise HTTPUnauthorized
# Requests over the Supervisor Unix socket are authenticated by the
# http auth middleware as the Supervisor user, so the caller-IP check
# below does not apply (and would crash, since `peername` is empty for
# Unix sockets). The user-ID check still runs to ensure only the
# Supervisor user can reach this endpoint.
if not is_supervisor_unix_socket_request(request):
hassio_ip = os.environ["SUPERVISOR"].split(":")[0]
assert request.transport
peername = request.transport.get_extra_info("peername")
Comment thread
agners marked this conversation as resolved.
if not peername or ip_address(peername[0]) != ip_address(hassio_ip):
_LOGGER.error("Invalid auth request from %s", request.remote)
raise HTTPUnauthorized
Comment on lines +50 to +56

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not is_supervisor_unix_socket_request(request):
hassio_ip = os.environ["SUPERVISOR"].split(":")[0]
assert request.transport
peername = request.transport.get_extra_info("peername")
if not peername or ip_address(peername[0]) != ip_address(hassio_ip):
_LOGGER.error("Invalid auth request from %s", request.remote)
raise HTTPUnauthorized
if not is_supervisor_unix_socket_request(request):
hassio_ip = os.environ["SUPERVISOR"].split(":")[0]
if (
not (transport := request.transport)
or not (peername := transport.get_extra_info("peername"))
or ip_address(peername[0]) != ip_address(hassio_ip)
):
_LOGGER.error("Invalid auth request from %s", request.remote)
raise HTTPUnauthorized

I'd overprefer a walrus operator here to patch out the assert. What do you think? :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the assert is existing code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, merely spotted a potential improvement. :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the same pattern (assert) also in other places in the hassio integration.

From what I understand that happens if the connection closed. I am not sure how realistic this is in this particular case, the close must have happened between the request coming in and processing here 🤔

I'd rather prefer to address all sites at once, if we decide to change how to handle request.transport being None.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I agree this does not need to block this PR, especially if the same pattern exists elsewhere.
If it's there purely to satisfy mypy, it should be fine. If we want to continuously check this in runtime, it would justify its own dedicated PR. :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It got added with #74603 as a mypy defense, so I assume the latter.


# Check caller token
if request[KEY_HASS_USER].id != self.user.id:
Expand Down
48 changes: 47 additions & 1 deletion tests/components/hassio/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""The tests for the hassio component."""

from http import HTTPStatus
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

from aiohttp.test_utils import TestClient
from aiohttp.web_exceptions import HTTPUnauthorized
import pytest

from homeassistant.auth.providers.homeassistant import InvalidAuth
from homeassistant.components.hassio.auth import HassIOBaseAuth
from homeassistant.components.hassio.const import DATA_CONFIG_STORE
from homeassistant.components.http import KEY_HASS_USER
from homeassistant.core import HomeAssistant


async def test_auth_success(hassio_client_supervisor: TestClient) -> None:
Expand Down Expand Up @@ -162,6 +168,46 @@ async def test_password_fails_no_auth(hassio_noauth_client: TestClient) -> None:
assert resp.status == HTTPStatus.UNAUTHORIZED


@pytest.mark.parametrize(
("peername", "unix_socket"),
[
# Unix socket transports report an empty string for peername. Before
# the fix this raised IndexError on `peername[0]`.
("", True),
# Defensive: a TCP transport with no peername at all should be
# rejected, not crash.
(None, False),
],
)
async def test_check_access_unix_socket_or_missing_peername(
hass: HomeAssistant,
hassio_stubs: None,
peername: str | None,
unix_socket: bool,
) -> None:
"""Test _check_access handles Unix socket requests and missing peername."""
hassio_user_id = hass.data[DATA_CONFIG_STORE].data.hassio_user
assert hassio_user_id is not None
user = await hass.auth.async_get_user(hassio_user_id)
assert user is not None

auth_view = HassIOBaseAuth(hass, user)

request = MagicMock()
request.transport.get_extra_info.return_value = peername
request.__getitem__.side_effect = lambda key: user if key is KEY_HASS_USER else None

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving the side effect to a test parameter that is a lambda.


with patch(
"homeassistant.components.hassio.auth.is_supervisor_unix_socket_request",
return_value=unix_socket,
):
if unix_socket:
auth_view._check_access(request)
else:
with pytest.raises(HTTPUnauthorized):
Comment thread
MartinHjelmare marked this conversation as resolved.
Outdated
auth_view._check_access(request)


async def test_password_no_user(hassio_client_supervisor: TestClient) -> None:
"""Test changing password for invalid user."""
resp = await hassio_client_supervisor.post(
Expand Down