From aa0e0a90b0a73c02654a73e531291fb0d1087d78 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Sun, 23 Jun 2019 15:04:23 -0600 Subject: [PATCH 1/9] Add support for Apache Kafka --- .../components/apache_kafka/__init__.py | 85 +++++++++++++++++++ .../components/apache_kafka/manifest.json | 10 +++ 2 files changed, 95 insertions(+) create mode 100644 homeassistant/components/apache_kafka/__init__.py create mode 100644 homeassistant/components/apache_kafka/manifest.json diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py new file mode 100644 index 00000000000000..f78d25d06397c7 --- /dev/null +++ b/homeassistant/components/apache_kafka/__init__.py @@ -0,0 +1,85 @@ +"""Support for Apache Kafka.""" +from datetime import datetime +import json +import logging + +import voluptuous as vol + +from homeassistant.const import ( + CONF_IP_ADDRESS, CONF_PORT, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, + STATE_UNAVAILABLE, STATE_UNKNOWN) +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entityfilter import FILTER_SCHEMA + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = 'apache_kafka' + +CONF_FILTER = 'filter' +CONF_TOPIC = 'topic' + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.Schema({ + vol.Required(CONF_IP_ADDRESS): cv.string, + vol.Required(CONF_PORT): cv.port, + vol.Required(CONF_TOPIC): cv.string, + vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA, + }), +}, extra=vol.ALLOW_EXTRA) + + +def setup(hass, config): + """Activate the Apache Kafka integration.""" + from aiokafka import AIOKafkaProducer + + conf = config[DOMAIN] + topic_name = conf[CONF_TOPIC] + entities_filter = conf[CONF_FILTER] + + producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format( + conf[CONF_IP_ADDRESS], conf[CONF_PORT]), + compression_type="gzip", + ) + + encoder = DateTimeJSONEncoder() + + async def send_to_pubsub(event): + """Send states to Pub/Sub.""" + await producer.start() + + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not entities_filter(state.entity_id)): + return + + as_dict = state.as_dict() + data = json.dumps( + obj=as_dict, + default=encoder.encode + ).encode('utf-8') + + try: + await producer.send_and_wait(topic_name, data) + finally: + producer.stop() + + hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop()) + hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub) + + return True + + +class DateTimeJSONEncoder(json.JSONEncoder): + """Encode python objects. + + Additionally add encoding for datetime objects as isoformat. + """ + + def default(self, o): # pylint: disable=E0202 + """Implement encoding logic.""" + if isinstance(o, datetime): + return o.isoformat() + return super().default(o) diff --git a/homeassistant/components/apache_kafka/manifest.json b/homeassistant/components/apache_kafka/manifest.json new file mode 100644 index 00000000000000..29d3d4bfcdcfd9 --- /dev/null +++ b/homeassistant/components/apache_kafka/manifest.json @@ -0,0 +1,10 @@ +{ + "domain": "apache_kafka", + "name": "Apache Kafka", + "documentation": "https://www.home-assistant.io/components/apache_kafka", + "requirements": [ + "aiokafka==0.5.1" + ], + "dependencies": [], + "codeowners": [] +} From fde4624e07a44edcaa728666777e7cfd789d68f6 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 12:41:10 -0600 Subject: [PATCH 2/9] Simplified --- .../components/apache_kafka/__init__.py | 99 ++++++++++++------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index f78d25d06397c7..166982d9496a1f 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -1,8 +1,11 @@ """Support for Apache Kafka.""" +import asyncio from datetime import datetime import json import logging +from aiokafka import AIOKafkaProducer +from aiokafka.errors import KafkaError import voluptuous as vol from homeassistant.const import ( @@ -28,46 +31,24 @@ }, extra=vol.ALLOW_EXTRA) -def setup(hass, config): +async def async_setup(hass, config): """Activate the Apache Kafka integration.""" - from aiokafka import AIOKafkaProducer - conf = config[DOMAIN] - topic_name = conf[CONF_TOPIC] - entities_filter = conf[CONF_FILTER] - - producer = AIOKafkaProducer( - loop=hass.loop, - bootstrap_servers="{0}:{1}".format( - conf[CONF_IP_ADDRESS], conf[CONF_PORT]), - compression_type="gzip", - ) - encoder = DateTimeJSONEncoder() + kafka = hass.data[DOMAIN] = KafkaManager( + hass, + conf[CONF_IP_ADDRESS], + conf[CONF_PORT], + conf[CONF_TOPIC], + conf[CONF_FILTER]) - async def send_to_pubsub(event): - """Send states to Pub/Sub.""" - await producer.start() + hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) - state = event.data.get('new_state') - if (state is None - or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) - or not entities_filter(state.entity_id)): - return - - as_dict = state.as_dict() - data = json.dumps( - obj=as_dict, - default=encoder.encode - ).encode('utf-8') - - try: - await producer.send_and_wait(topic_name, data) - finally: - producer.stop() - - hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop()) - hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub) + try: + await kafka.start() + except asyncio.TimeoutError: + _LOGGER.error('Timed out while connecting to Kafka') + return False return True @@ -83,3 +64,51 @@ def default(self, o): # pylint: disable=E0202 if isinstance(o, datetime): return o.isoformat() return super().default(o) + + +class KafkaManager: + """Define a manager to buffer events to Kafka.""" + + def __init__( + self, + hass, + ip_address, + port, + topic, + entities_filter): + """Initialize.""" + self._encoder = DateTimeJSONEncoder() + self._entities_filter = entities_filter + self._producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format(ip_address, port), + compression_type="gzip", + ) + self._topic = topic + + hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka) + + def _encode_event(self, event): + """Translate events into a binary JSON payload.""" + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not self._entities_filter(state.entity_id)): + return + + return json.dumps( + obj=state.as_dict(), + default=self._encoder.encode + ).encode('utf-8') + + async def _write_to_kafka(self, event): + """Write a binary payload to Kafka.""" + await self._producer.send_and_wait(self._topic, event) + + async def start(self): + """Start the Kafka manager.""" + asyncio.wait_for(self._producer.start(), timeout=5) + + async def shutdown(self): + """Shut the manager down.""" + await self._producer.stop() From 5ae57e64c29f9e1cdbba6545420f4c2003d664e2 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 12:54:18 -0600 Subject: [PATCH 3/9] Revert "Simplified" This reverts commit fde4624e07a44edcaa728666777e7cfd789d68f6. --- .../components/apache_kafka/__init__.py | 99 +++++++------------ 1 file changed, 35 insertions(+), 64 deletions(-) diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index 166982d9496a1f..f78d25d06397c7 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -1,11 +1,8 @@ """Support for Apache Kafka.""" -import asyncio from datetime import datetime import json import logging -from aiokafka import AIOKafkaProducer -from aiokafka.errors import KafkaError import voluptuous as vol from homeassistant.const import ( @@ -31,24 +28,46 @@ }, extra=vol.ALLOW_EXTRA) -async def async_setup(hass, config): +def setup(hass, config): """Activate the Apache Kafka integration.""" + from aiokafka import AIOKafkaProducer + conf = config[DOMAIN] + topic_name = conf[CONF_TOPIC] + entities_filter = conf[CONF_FILTER] + + producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format( + conf[CONF_IP_ADDRESS], conf[CONF_PORT]), + compression_type="gzip", + ) - kafka = hass.data[DOMAIN] = KafkaManager( - hass, - conf[CONF_IP_ADDRESS], - conf[CONF_PORT], - conf[CONF_TOPIC], - conf[CONF_FILTER]) + encoder = DateTimeJSONEncoder() - hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) + async def send_to_pubsub(event): + """Send states to Pub/Sub.""" + await producer.start() - try: - await kafka.start() - except asyncio.TimeoutError: - _LOGGER.error('Timed out while connecting to Kafka') - return False + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not entities_filter(state.entity_id)): + return + + as_dict = state.as_dict() + data = json.dumps( + obj=as_dict, + default=encoder.encode + ).encode('utf-8') + + try: + await producer.send_and_wait(topic_name, data) + finally: + producer.stop() + + hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop()) + hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub) return True @@ -64,51 +83,3 @@ def default(self, o): # pylint: disable=E0202 if isinstance(o, datetime): return o.isoformat() return super().default(o) - - -class KafkaManager: - """Define a manager to buffer events to Kafka.""" - - def __init__( - self, - hass, - ip_address, - port, - topic, - entities_filter): - """Initialize.""" - self._encoder = DateTimeJSONEncoder() - self._entities_filter = entities_filter - self._producer = AIOKafkaProducer( - loop=hass.loop, - bootstrap_servers="{0}:{1}".format(ip_address, port), - compression_type="gzip", - ) - self._topic = topic - - hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka) - - def _encode_event(self, event): - """Translate events into a binary JSON payload.""" - state = event.data.get('new_state') - if (state is None - or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) - or not self._entities_filter(state.entity_id)): - return - - return json.dumps( - obj=state.as_dict(), - default=self._encoder.encode - ).encode('utf-8') - - async def _write_to_kafka(self, event): - """Write a binary payload to Kafka.""" - await self._producer.send_and_wait(self._topic, event) - - async def start(self): - """Start the Kafka manager.""" - asyncio.wait_for(self._producer.start(), timeout=5) - - async def shutdown(self): - """Shut the manager down.""" - await self._producer.stop() From ba12b0413591da5ed4e56fa6faaf66612f1ca56a Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 12:56:31 -0600 Subject: [PATCH 4/9] Revert "Revert "Simplified"" This reverts commit 5ae57e64c29f9e1cdbba6545420f4c2003d664e2. --- .../components/apache_kafka/__init__.py | 99 ++++++++++++------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index f78d25d06397c7..166982d9496a1f 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -1,8 +1,11 @@ """Support for Apache Kafka.""" +import asyncio from datetime import datetime import json import logging +from aiokafka import AIOKafkaProducer +from aiokafka.errors import KafkaError import voluptuous as vol from homeassistant.const import ( @@ -28,46 +31,24 @@ }, extra=vol.ALLOW_EXTRA) -def setup(hass, config): +async def async_setup(hass, config): """Activate the Apache Kafka integration.""" - from aiokafka import AIOKafkaProducer - conf = config[DOMAIN] - topic_name = conf[CONF_TOPIC] - entities_filter = conf[CONF_FILTER] - - producer = AIOKafkaProducer( - loop=hass.loop, - bootstrap_servers="{0}:{1}".format( - conf[CONF_IP_ADDRESS], conf[CONF_PORT]), - compression_type="gzip", - ) - encoder = DateTimeJSONEncoder() + kafka = hass.data[DOMAIN] = KafkaManager( + hass, + conf[CONF_IP_ADDRESS], + conf[CONF_PORT], + conf[CONF_TOPIC], + conf[CONF_FILTER]) - async def send_to_pubsub(event): - """Send states to Pub/Sub.""" - await producer.start() + hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) - state = event.data.get('new_state') - if (state is None - or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) - or not entities_filter(state.entity_id)): - return - - as_dict = state.as_dict() - data = json.dumps( - obj=as_dict, - default=encoder.encode - ).encode('utf-8') - - try: - await producer.send_and_wait(topic_name, data) - finally: - producer.stop() - - hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop()) - hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub) + try: + await kafka.start() + except asyncio.TimeoutError: + _LOGGER.error('Timed out while connecting to Kafka') + return False return True @@ -83,3 +64,51 @@ def default(self, o): # pylint: disable=E0202 if isinstance(o, datetime): return o.isoformat() return super().default(o) + + +class KafkaManager: + """Define a manager to buffer events to Kafka.""" + + def __init__( + self, + hass, + ip_address, + port, + topic, + entities_filter): + """Initialize.""" + self._encoder = DateTimeJSONEncoder() + self._entities_filter = entities_filter + self._producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format(ip_address, port), + compression_type="gzip", + ) + self._topic = topic + + hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka) + + def _encode_event(self, event): + """Translate events into a binary JSON payload.""" + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not self._entities_filter(state.entity_id)): + return + + return json.dumps( + obj=state.as_dict(), + default=self._encoder.encode + ).encode('utf-8') + + async def _write_to_kafka(self, event): + """Write a binary payload to Kafka.""" + await self._producer.send_and_wait(self._topic, event) + + async def start(self): + """Start the Kafka manager.""" + asyncio.wait_for(self._producer.start(), timeout=5) + + async def shutdown(self): + """Shut the manager down.""" + await self._producer.stop() From 721544353eea67166bb0d3d70a407702861ceeb9 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 13:17:13 -0600 Subject: [PATCH 5/9] Completed --- .../components/apache_kafka/__init__.py | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index 166982d9496a1f..650865ca63304f 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -1,5 +1,4 @@ """Support for Apache Kafka.""" -import asyncio from datetime import datetime import json import logging @@ -42,13 +41,9 @@ async def async_setup(hass, config): conf[CONF_TOPIC], conf[CONF_FILTER]) - hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) + hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) - try: - await kafka.start() - except asyncio.TimeoutError: - _LOGGER.error('Timed out while connecting to Kafka') - return False + await kafka.start() return True @@ -79,6 +74,7 @@ def __init__( """Initialize.""" self._encoder = DateTimeJSONEncoder() self._entities_filter = entities_filter + self._hass = hass self._producer = AIOKafkaProducer( loop=hass.loop, bootstrap_servers="{0}:{1}".format(ip_address, port), @@ -86,8 +82,6 @@ def __init__( ) self._topic = topic - hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka) - def _encode_event(self, event): """Translate events into a binary JSON payload.""" state = event.data.get('new_state') @@ -101,14 +95,18 @@ def _encode_event(self, event): default=self._encoder.encode ).encode('utf-8') - async def _write_to_kafka(self, event): - """Write a binary payload to Kafka.""" - await self._producer.send_and_wait(self._topic, event) - async def start(self): """Start the Kafka manager.""" - asyncio.wait_for(self._producer.start(), timeout=5) + self._hass.bus.async_listen(EVENT_STATE_CHANGED, self.write) + await self._producer.start() async def shutdown(self): """Shut the manager down.""" await self._producer.stop() + + async def write(self, event): + """Write a binary payload to Kafka.""" + payload = self._encode_event(event) + + if payload: + await self._producer.send_and_wait(self._topic, payload) From ddc07a42d68e3e7d54b318317165553871bf2c81 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 13:31:59 -0600 Subject: [PATCH 6/9] Updated requirements --- requirements_all.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/requirements_all.txt b/requirements_all.txt index c9a4e829ec669c..d211821e3ab3da 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -150,6 +150,9 @@ aiohue==1.9.1 # homeassistant.components.imap aioimaplib==0.7.15 +# homeassistant.components.apache_kafka +aiokafka==0.5.1 + # homeassistant.components.lifx aiolifx==0.6.7 From 151e870b977216122cdabf062414bb3b499e192f Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 13:33:30 -0600 Subject: [PATCH 7/9] Updated .coveragerc --- .coveragerc | 1 + 1 file changed, 1 insertion(+) diff --git a/.coveragerc b/.coveragerc index 781b5d172794c2..ff02e55d1f9f65 100644 --- a/.coveragerc +++ b/.coveragerc @@ -34,6 +34,7 @@ omit = homeassistant/components/androidtv/* homeassistant/components/anel_pwrctrl/switch.py homeassistant/components/anthemav/media_player.py + homeassistant/components/apache_kafka/* homeassistant/components/apcupsd/* homeassistant/components/apple_tv/* homeassistant/components/aqualogic/* From cea8faaf869e3b44f9ab09ec58d9a454b1d6d4dd Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 13:46:24 -0600 Subject: [PATCH 8/9] Removed unused import --- homeassistant/components/apache_kafka/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index 650865ca63304f..e8617eaf317468 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -4,7 +4,6 @@ import logging from aiokafka import AIOKafkaProducer -from aiokafka.errors import KafkaError import voluptuous as vol from homeassistant.const import ( From 75f316eca125e9b3f587da4b2046cc0c49309be8 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Thu, 11 Jul 2019 14:36:29 -0600 Subject: [PATCH 9/9] Updated codeowner --- CODEOWNERS | 1 + homeassistant/components/apache_kafka/manifest.json | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CODEOWNERS b/CODEOWNERS index 8117968cf11ca7..66d2b4ef8f5107 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -24,6 +24,7 @@ homeassistant/components/alpha_vantage/* @fabaff homeassistant/components/amazon_polly/* @robbiet480 homeassistant/components/ambiclimate/* @danielhiversen homeassistant/components/ambient_station/* @bachya +homeassistant/components/apache_kafka/* @bachya homeassistant/components/api/* @home-assistant/core homeassistant/components/aprs/* @PhilRW homeassistant/components/arcam_fmj/* @elupus diff --git a/homeassistant/components/apache_kafka/manifest.json b/homeassistant/components/apache_kafka/manifest.json index 29d3d4bfcdcfd9..ac36af7fa48fe4 100644 --- a/homeassistant/components/apache_kafka/manifest.json +++ b/homeassistant/components/apache_kafka/manifest.json @@ -6,5 +6,7 @@ "aiokafka==0.5.1" ], "dependencies": [], - "codeowners": [] + "codeowners": [ + "@bachya" + ] }