-
-
Notifications
You must be signed in to change notification settings - Fork 32.3k
/
Copy pathsensor.py
147 lines (117 loc) · 4.39 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Support for Worx Landroid mower."""
import asyncio
import logging
import aiohttp
import async_timeout
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_HOST, CONF_PIN, CONF_TIMEOUT, PERCENTAGE
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__)
CONF_ALLOW_UNREACHABLE = "allow_unreachable"
DEFAULT_TIMEOUT = 5
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PIN): vol.All(vol.Coerce(str), vol.Match(r"\d{4}")),
vol.Optional(CONF_ALLOW_UNREACHABLE, default=True): cv.boolean,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
}
)
ERROR_STATE = [
"blade-blocked",
"repositioning-error",
"wire-bounced",
"blade-blocked",
"outside-wire",
"mower-lifted",
"alarm-6",
"upside-down",
"alarm-8",
"collision-sensor-blocked",
"mower-tilted",
"charge-error",
"battery-error",
]
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Worx Landroid sensors."""
for typ in ("battery", "state"):
async_add_entities([WorxLandroidSensor(typ, config)])
class WorxLandroidSensor(Entity):
"""Implementation of a Worx Landroid sensor."""
def __init__(self, sensor, config):
"""Initialize a Worx Landroid sensor."""
self._state = None
self.sensor = sensor
self.host = config.get(CONF_HOST)
self.pin = config.get(CONF_PIN)
self.timeout = config.get(CONF_TIMEOUT)
self.allow_unreachable = config.get(CONF_ALLOW_UNREACHABLE)
self.url = f"http://{self.host}/jsondata.cgi"
@property
def name(self):
"""Return the name of the sensor."""
return f"worxlandroid-{self.sensor}"
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement of the sensor."""
if self.sensor == "battery":
return PERCENTAGE
return None
async def async_update(self):
"""Update the sensor data from the mower."""
connection_error = False
try:
session = async_get_clientsession(self.hass)
with async_timeout.timeout(self.timeout):
auth = aiohttp.helpers.BasicAuth("admin", self.pin)
mower_response = await session.get(self.url, auth=auth)
except (asyncio.TimeoutError, aiohttp.ClientError):
if self.allow_unreachable is False:
_LOGGER.error("Error connecting to mower at %s", self.url)
connection_error = True
# connection error
if connection_error is True and self.allow_unreachable is False:
if self.sensor == "error":
self._state = "yes"
elif self.sensor == "state":
self._state = "connection-error"
# connection success
elif connection_error is False:
# set the expected content type to be text/html
# since the mover incorrectly returns it...
data = await mower_response.json(content_type="text/html")
# sensor battery
if self.sensor == "battery":
self._state = data["perc_batt"]
# sensor error
elif self.sensor == "error":
self._state = "no" if self.get_error(data) is None else "yes"
# sensor state
elif self.sensor == "state":
self._state = self.get_state(data)
else:
if self.sensor == "error":
self._state = "no"
@staticmethod
def get_error(obj):
"""Get the mower error."""
for i, err in enumerate(obj["allarmi"]):
if i != 2: # ignore wire bounce errors
if err == 1:
return ERROR_STATE[i]
return None
def get_state(self, obj):
"""Get the state of the mower."""
state = self.get_error(obj)
if state is None:
if obj["batteryChargerState"] == "charging":
return obj["batteryChargerState"]
return obj["state"]
return state