Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions homeassistant/helpers/location.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
"""Location helpers for Home Assistant."""

import logging
from typing import Optional, Sequence

import voluptuous as vol

from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
from homeassistant.core import State
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import location as loc_util

_LOGGER = logging.getLogger(__name__)


def has_location(state: State) -> bool:
"""Test if state contains a valid location.
Expand Down Expand Up @@ -41,3 +48,61 @@ def closest(
longitude,
),
)


def find_coordinates(
hass: HomeAssistantType, entity_id: str, recursion_history: Optional[list] = None
) -> Optional[str]:
"""Find the gps coordinates of the entity in the form of '90.000,180.000'."""
entity_state = hass.states.get(entity_id)

if entity_state is None:
_LOGGER.error("Unable to find entity %s", entity_id)
return None

# Check if the entity has location attributes
if has_location(entity_state):
return _get_location_from_attributes(entity_state)

# Check if device is in a zone
zone_entity = hass.states.get(f"zone.{entity_state.state}")
if has_location(zone_entity): # type: ignore
_LOGGER.debug(
"%s is in %s, getting zone location", entity_id, zone_entity.entity_id # type: ignore
)
return _get_location_from_attributes(zone_entity) # type: ignore

# Resolve nested entity
if recursion_history is None:
recursion_history = []
recursion_history.append(entity_id)
if entity_state.state in recursion_history:
_LOGGER.error(
"Circular reference detected while trying to find coordinates of an entity. The state of %s has already been checked",
entity_state.state,
)
return None
_LOGGER.debug("Getting nested entity for state: %s", entity_state.state)
nested_entity = hass.states.get(entity_state.state)
if nested_entity is not None:
_LOGGER.debug("Resolving nested entity_id: %s", entity_state.state)
return find_coordinates(hass, entity_state.state, recursion_history)

# Check if state is valid coordinate set
try:
cv.gps(entity_state.state.split(","))
except vol.Invalid:
_LOGGER.error(
"Entity %s does not contain a location and does not point at an entity that does: %s",
entity_id,
entity_state.state,
)
return None
else:
return entity_state.state


def _get_location_from_attributes(entity_state: State) -> str:
"""Get the lat/long string from an entities attributes."""
attr = entity_state.attributes
return "{},{}".format(attr.get(ATTR_LATITUDE), attr.get(ATTR_LONGITUDE))
61 changes: 61 additions & 0 deletions tests/helpers/test_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,64 @@ def test_closest_returns_closest():
state2 = State("light.test", "on", {ATTR_LATITUDE: 125.45, ATTR_LONGITUDE: 125.45})

assert state == location.closest(123.45, 123.45, [state, state2])


async def test_coordinates_function_as_attributes(hass):
"""Test coordinates function."""
hass.states.async_set(
"test.object", "happy", {"latitude": 32.87336, "longitude": -117.22943}
)
assert location.find_coordinates(hass, "test.object") == "32.87336,-117.22943"


async def test_coordinates_function_as_state(hass):
"""Test coordinates function."""
hass.states.async_set("test.object", "32.87336,-117.22943")
assert location.find_coordinates(hass, "test.object") == "32.87336,-117.22943"


async def test_coordinates_function_device_tracker_in_zone(hass):
"""Test coordinates function."""
hass.states.async_set(
"zone.home", "zoning", {"latitude": 32.87336, "longitude": -117.22943},
)
hass.states.async_set("device_tracker.device", "home")
assert (
location.find_coordinates(hass, "device_tracker.device")
== "32.87336,-117.22943"
)


async def test_coordinates_function_device_tracker_from_input_select(hass):
"""Test coordinates function."""
hass.states.async_set(
"input_select.select",
"device_tracker.device",
{"options": "device_tracker.device"},
)
hass.states.async_set("device_tracker.device", "32.87336,-117.22943")
assert (
location.find_coordinates(hass, "input_select.select") == "32.87336,-117.22943"
)


def test_coordinates_function_returns_none_on_recursion(hass):
"""Test coordinates function."""
hass.states.async_set(
"test.first", "test.second",
)
hass.states.async_set("test.second", "test.first")
assert location.find_coordinates(hass, "test.first") is None


async def test_coordinates_function_returns_none_if_invalid_coord(hass):
"""Test test_coordinates function."""
hass.states.async_set(
"test.object", "abc",
)
assert location.find_coordinates(hass, "test.object") is None


def test_coordinates_function_returns_none_if_invalid_input(hass):
"""Test test_coordinates function."""
assert location.find_coordinates(hass, "test.abc") is None