diff --git a/.gitignore b/.gitignore index 9948299..b7ba1e2 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,7 @@ files.txt MANIFEST run.py *.pem -build/ \ No newline at end of file +build/ +/caseta-bridge.crt +/caseta.crt +/caseta.key diff --git a/get_lutron_cert.py b/get_lutron_cert.py new file mode 100644 index 0000000..07828a2 --- /dev/null +++ b/get_lutron_cert.py @@ -0,0 +1,161 @@ +"""Guide the user through pairing and save the necessary files.""" + +# based on https://git.io/vxjpt +# original script by Mathieu Hofman + +# `python -m venv env` +# bash: `source ./env/bin/activate` +# powershell: `./env/scripts/activate.ps1` +# `pip install cryptography==2.1.3 requests==2.18.4` +# `python get_lutron_cert.py` + +# your client key -> caseta.key +# your client certificate -> caseta.crt +# your bridge certificate -> caseta-bridge.crt + +# when setting up Home Assistant, use the following configuration: +# lutron_caseta: +# host: +# keyfile: caseta.key +# certfile: caseta.crt +# ca_certs: caseta-bridge.crt + +import json +import re +import requests +import socket +import ssl + +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.serialization import load_pem_private_key + +from urllib.parse import urlencode + +LOGIN_SERVER = "device-login.lutron.com" +APP_CLIENT_ID = ("e001a4471eb6152b7b3f35e549905fd8589dfcf57eb680b6fb37f20878c" + "28e5a") +APP_CLIENT_SECRET = ("b07fee362538d6df3b129dc3026a72d27e1005a3d1e5839eed5ed18" + "c63a89b27") +APP_OAUTH_REDIRECT_PAGE = "lutron_app_oauth_redirect" +CERT_SUBJECT = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Pennsylvania"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "Coopersburg"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, + "Lutron Electronics Co., Inc."), + x509.NameAttribute(NameOID.COMMON_NAME, "Lutron Caseta App") +]) + +BASE_URL = "https://%s/" % LOGIN_SERVER +REDIRECT_URI = "https://%s/%s" % (LOGIN_SERVER, APP_OAUTH_REDIRECT_PAGE) + +AUTHORIZE_URL = ("%soauth/authorize?%s" % (BASE_URL, + urlencode({ + "client_id": APP_CLIENT_ID, + "redirect_uri": REDIRECT_URI, + "response_type": "code" + }))) + +try: + with open('caseta.key', 'rb') as f: + private_key = load_pem_private_key(f.read(), None, default_backend()) +except FileNotFoundError: + private_key = rsa.generate_private_key(public_exponent=65537, + key_size=2048, + backend=default_backend()) + with open('caseta.key', 'wb') as f: + f.write(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + )) + +try: + with open('caseta.crt', 'rb') as f: + certificate = x509.load_pem_x509_certificate(f.read(), + default_backend()) +except FileNotFoundError: + csr = (x509.CertificateSigningRequestBuilder() + .subject_name(CERT_SUBJECT) + .sign(private_key, hashes.SHA256(), default_backend())) + + print("Open Browser and login at %s" % AUTHORIZE_URL) + + redirected_url = input("Enter the URL (of the \"error\" page you got " + "redirected to (or the code in the URL): ") + + oauth_code = re.sub(r'^(.*?code=){0,1}([0-9a-f]*)\s*$', r'\2', + redirected_url) + + if oauth_code == '': + raise "Invalid code" + + token = requests.post("%soauth/token" % BASE_URL, data={ + "code": oauth_code, + "client_id": APP_CLIENT_ID, + "client_secret": APP_CLIENT_SECRET, + "redirect_uri": REDIRECT_URI, + "grant_type": "authorization_code"}).json() + + if token["token_type"] != "bearer": + raise ("Received invalid token %s. Try generating a new code " + "(one time use).") % token + + access_token = token["access_token"] + + pairing_request_content = { + "remote_signs_app_certificate_signing_request": + csr.public_bytes(serialization.Encoding.PEM).decode('ASCII') + } + + pairing_response = requests.post( + "%sapi/v1/remotepairing/application/user" % BASE_URL, + json=pairing_request_content, + headers={ + "X-DeviceType": "Caseta,RA2Select", + "Authorization": "Bearer %s" % access_token + } + ).json() + + app_cert = pairing_response["remote_signs_app_certificate"] + remote_cert = pairing_response["local_signs_remote_certificate"] + + with open('caseta.crt', 'wb') as f: + f.write(app_cert.encode('ASCII')) + f.write(remote_cert.encode('ASCII')) + +server_addr = input("Enter the address of your Caseta bridge device: ") + +raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +ssl_socket = ssl.wrap_socket(raw_socket, keyfile='caseta.key', + certfile='caseta.crt', + ssl_version=ssl.PROTOCOL_TLSv1_2) +ssl_socket.connect((server_addr, 8081)) + +ca_der = ssl_socket.getpeercert(True) +ca_cert = x509.load_der_x509_certificate(ca_der, default_backend()) +with open('caseta-bridge.crt', 'wb') as f: + f.write(ca_cert.public_bytes(serialization.Encoding.PEM)) + +ssl_socket.send(("%s\r\n" % json.dumps({ + "CommuniqueType": "ReadRequest", + "Header": {"Url": "/server/1/status/ping"} +})).encode('UTF-8')) + +while True: + buffer = b'' + while not buffer.endswith(b'\r\n'): + buffer += ssl_socket.read() + + leap_response = json.loads(buffer.decode('UTF-8')) + if leap_response['CommuniqueType'] == 'ReadResponse': + break + +ssl_socket.close() + +print("Successfully connected to bridge, running LEAP Server version %s" % + leap_response['Body']['PingResponse']['LEAPVersion']) diff --git a/pylutron_caseta/smartbridge.py b/pylutron_caseta/smartbridge.py index fd50d34..0321247 100755 --- a/pylutron_caseta/smartbridge.py +++ b/pylutron_caseta/smartbridge.py @@ -310,7 +310,10 @@ def _load_devices(self): _LOG.debug("Loading devices") self._writer.write({ "CommuniqueType": "ReadRequest", "Header": {"Url": "/device"}}) - device_json = yield from self._reader.read() + while True: + device_json = yield from self._reader.read() + if device_json['CommuniqueType'] == 'ReadResponse': + break for device in device_json['Body']['Devices']: _LOG.debug(device) device_id = device['href'][device['href'].rfind('/') + 1:] @@ -337,7 +340,10 @@ def _load_scenes(self): self._writer.write({ "CommuniqueType": "ReadRequest", "Header": {"Url": "/virtualbutton"}}) - scene_json = yield from self._reader.read() + while True: + scene_json = yield from self._reader.read() + if scene_json['CommuniqueType'] == 'ReadResponse': + break for scene in scene_json['Body']['VirtualButtons']: _LOG.debug(scene) if scene['IsProgrammed']: