Sunday, 3 December 2017

F--k you Lutron!


I've expanded my Lutron Caseta system with a 2nd dimmer switch (I'm not a huge fan of the switches, the buttons feel kind of cheap and they can be confusing to use) but they are offer 2-way communication, they work reliably and they do not require a neutral wire which is a big plus.for older homes like mine.

So my Lutron App stopped working a while ago because my hub needed an update which I figured was a bad idea(TM) and since I rarely used the app anyway it didn't seem that important.

Then I decided to do an all around firmware update including the hub and my fears were confirmed.  Since a pro hub that has an API costs twice as much I wasn't really running out to replace what I had.

Lutron has removed the SSH access (well it's still listening on the SSH port but it doesn't accept the key that was floating around the internet anymore) and replaced it's app communication with a SSL based web type service called LEAP.

On the plus side this method is more secure than having a single key that works on every hub, but on the down side you need their server to create a certificate for you now to gain access to the LEAP service.

Fortunately the folks at Home Assistant along with the folks over a Github maintaining the pylutron-caseta module have released an update and a script for fetching the key/cert files for your hub from Lutron.

So first you need to get your private key file, as well as a certificate with your devices MAC in it signed by Lutron and the CA certificate from Lutron.   You will also need the local CA certificate of your hub.  These are all valid until 2038 so you should really only need to do this once if you block your device from internet access which I recommend you do.  I'm not sure what will happen when the certs expire but you may need to offer up a fake NTP server with an old date if Lutron isn't around or has discontinued this service by then.

There is a little Python script you can get here which will have you sign into your Lutron account and get the OAuth string so it can request the required keys and certificates.  It will then write out the files for you.  If you are reading this I suggest you get these files even if you don't need them as they may make it harder to get in the future.

The newest pylutron_caseta on Github has been written for Python 3 and I really don't know enough Py3 to re-write my whole app... I fought with it for a while and I found an intermediate update which will switch the existing Py2.7 code from SSH to SSL, so I have updated my library and re-written a small portion of my code to work with the new one. 

One item of note is that you need to "ping" the hub every once in a while (I believe 15 mins is the timeout) via the web interface or it will stop sending you fresh data and you will need to close and re-open the connection.

I had a hell of a time finding any documentation, support or example code for this library so there was a lot of trial and error involved as well as digging through source and HA's modules.

This code works on both Windows and Linux but you will need Python 2.7.9 or higher as older versions don't support the SSL version used by the hub.

So here is the step by step guide with code:
1) Download the zip from HA linked above to get your 3 certificates and your private key.  Run the script and follow the instructions.  It will confirm it was able to communicate with your bridge and output the following files: caseta.key, caseta-bridge.crt and caseta.crt.

2)  Create the smarbridge.py module with the following code:

# I did not write this code, this came from Github, I just added the ping stuff to it.

"""Provides an API to interact with the Lutron Caseta Smart Bridge."""

import json
import logging
import threading
import ssl
import socket

#from pylutron_caseta import _LEAP_DEVICE_TYPES

_LEAP_DEVICE_TYPES = {'light': ['WallDimmer', 'PlugInDimmer'],
                      'switch': ['WallSwitch'],
                      'cover': ['SerenaHoneycombShade', 'SerenaRollerShade',
                                'TriathlonHoneycombShade',
                                'TriathlonRollerShade', 'QsWirelessShade'],
                      'sensor': ['Pico1Button', 'Pico2Button',
                                 'Pico2ButtonRaiseLower', 'Pico3Button',
                                 'Pico3ButtonRaiseLower', 'Pico4Button',
                                 'Pico4ButtonScene', 'Pico4ButtonZone',
                                 'Pico4Button2Group', 'FourGroupRemote']}



_LOG = logging.getLogger('smartbridge')
_LOG.setLevel(logging.DEBUG)


class Smartbridge:
    """
    A representation of the Lutron Caseta Smart Bridge.

    It uses an SSH interface known as the LEAP server.
    """

    def __init__(self, hostname, keyfile, certfile, ca_certs):
        """Initialize the Smart Bridge."""
        self.devices = {}
        self.scenes = {}
        self._hostname = hostname
        self._keyfile = keyfile
        self._certfile = certfile
        self._ca_certs = ca_certs
        self.logged_in = False
        self._ssl_sock = None
        self._login()
        self._load_devices()
        self._load_scenes()
        _LOG.debug(self.devices)
        _LOG.debug(self.scenes)
        monitor = threading.Thread(target=self._monitor)
        monitor.setDaemon(True)
        monitor.start()
        for _id in self.devices:
            self.get_value(_id)

        self._subscribers = {}

    def add_subscriber(self, device_id, callback_):
        """
        Add a listener to be notified of state changes.

        :param device_id: device id, e.g. 5
        :param callback_: callback to invoke
        """
        self._subscribers[device_id] = callback_

    def get_devices(self):
        """Will return all known devices connected to the Smart Bridge."""
        return self.devices

    def get_devices_by_domain(self, domain):
        """
        Return a list of devices for the given domain.

        :param domain: one of 'light', 'switch', 'cover' or 'sensor'
        :returns list of zero or more of the devices
        """
        devs = []

        # return immediately if not a supported domain
        if domain not in _LEAP_DEVICE_TYPES:
            return devs

        # loop over all devices and check their type
        for device_id in self.devices:
            if self.devices[device_id]['type'] in _LEAP_DEVICE_TYPES[domain]:
                devs.append(self.devices[device_id])
        return devs

    def get_devices_by_type(self, type_):
        """
        Will return all devices of a given device type.

        :param type_: LEAP device type, e.g. WallSwitch
        """
        devs = []
        for device_id in self.devices:
            if self.devices[device_id]['type'] == type_:
                devs.append(self.devices[device_id])
        return devs

    def get_devices_by_types(self, types):
        """
        Will return all devices of for a list of given device types.

        :param types: list of LEAP device types such as WallSwitch, WallDimmer
        """
        devs = []
        for device_id in self.devices:
            if self.devices[device_id]['type'] in types:
                devs.append(self.devices[device_id])
        return devs

    def get_device_by_id(self, device_id):
        """
        Will return a device with the given ID.

        :param device_id: device id, e.g. 5
        """
        return self.devices[device_id]

    def get_scenes(self):
        """Will return all known scenes from the Smart Bridge."""
        return self.scenes

    def get_scene_by_id(self, scene_id):
        """
        Will return a scene with the given scene ID.

        :param scene_id: scene id, e.g 23
        """
        return self.scenes[scene_id]

    def ping(self):
"""
Pings the device to keep alive
"""
        cmd = '{"CommuniqueType":"ReadRequest",' \
              '"Header":{"Url":"/server/1/status/ping"}}\n'
        return self._send_command(cmd)


    def get_value(self, device_id):
        """
        Will return the current level value for the device with the given ID.

        :param device_id: device id, e.g. 5
        :returns level value from 0 to 100
        :rtype int
        """
        zone_id = self._get_zone_id(device_id)
        cmd = '{"CommuniqueType":"ReadRequest",' \
              '"Header":{"Url":"/zone/%s/status"}}\n' % zone_id
        if zone_id:
            return self._send_command(cmd)

    def is_connected(self):
        """Will return True if currently connected to the Smart Bridge."""
        return self.logged_in

    def is_on(self, device_id):
        """
        Will return True is the device with the given ID is 'on'.

        :param device_id: device id, e.g. 5
        :returns True if level is greater than 0 level, False otherwise
        """
        return self.devices[device_id]['current_state'] > 0

    def set_value(self, device_id, value):
        """
        Will set the value for a device with the given ID.

        :param device_id: device id to set the value on
        :param value: integer value from 0 to 100 to set
        """
        zone_id = self._get_zone_id(device_id)
        if zone_id:
            cmd = '{"CommuniqueType":"CreateRequest",' \
                  '"Header":{"Url":"/zone/%s/commandprocessor"},' \
                  '"Body":{"Command":{"CommandType":"GoToLevel",' \
                  '"Parameter":[{"Type":"Level",' \
                  '"Value":%s}]}}}\n' % (zone_id, value)
            return self._send_command(cmd)

    def turn_on(self, device_id):
        """
        Will turn 'on' the device with the given ID.

        :param device_id: device id to turn on
        """
        return self.set_value(device_id, 100)

    def turn_off(self, device_id):
        """
        Will turn 'off' the device with the given ID.

        :param device_id: device id to turn off
        """
        return self.set_value(device_id, 0)

    def activate_scene(self, scene_id):
        """
        Will activate the scene with the given ID.

        :param scene_id: scene id, e.g. 23
        """
        if scene_id in self.scenes:
            cmd = '{"CommuniqueType":"CreateRequest",' \
                  '"Header":{"Url":"/virtualbutton/%s/commandprocessor"},' \
                  '"Body":{"Command":{"CommandType":"PressAndRelease"}}}' \
                  '\n' % scene_id
            return self._send_command(cmd)

    def _get_zone_id(self, device_id):
        """
        Return the zone id for an given device.

        :param device_id: device id for which to retrieve a zone id
        """
        device = self.devices[device_id]
        if 'zone' in device:
            return device['zone']
        return None

    def _send_command(self, cmd):
        """Send a command to the bridge."""
        self._ssl_sock.send(cmd)

    def _monitor(self):
        """Event monitoring loop."""
        while True:
            try:
                # require a certificate from the server
                ssl_output = self._ssl_sock.recv(1)
                response = ssl_output
                while ssl_output != "\n":
                    ssl_output = self._ssl_sock.recv(1)
                    response += ssl_output

                _LOG.debug(response)
                resp_parts = response.split(b'\r\n')
                try:
                    for resp in resp_parts:
                        if resp:
                            resp_json = json.loads(resp.decode("UTF-8"))
                            self._handle_response(resp_json)
                except ValueError:
                    _LOG.error("Invalid response "
                               "from SmartBridge: " + response.decode("UTF-8"))
    except Exception as e:
print "Shit something fucked up!"
print e
            except ConnectionError:
                self.logged_in = False

    def _handle_response(self, resp_json):
        """
        Handle an event from the ssl interface.

        If a zone level was changed either by external means such as a Pico
        remote or by a command sent from us, the new level will appear on the
        SSH shell and the response is handled by this function.

        :param resp_json: full JSON response from the SSH shell
        """
        comm_type = resp_json['CommuniqueType']
        if comm_type == 'ReadResponse':
            body = resp_json['Body']
    try:
    if body['PingResponse'] != "":
print "*PONG*"
return 0
    except:
pass
            zone = body['ZoneStatus']['Zone']['href']
            zone = zone[zone.rfind('/') + 1:]
            level = body['ZoneStatus']['Level']
            _LOG.debug('zone=%s level=%s', zone, level)
            for _device_id in self.devices:
                device = self.devices[_device_id]
                if 'zone' in device:
                    if zone == device['zone']:
                        device['current_state'] = level
                        if _device_id in self._subscribers:
                            self._subscribers[_device_id]()

    def _login(self):
        """Connect and login to the Smart Bridge LEAP server using SSL."""
        if self.logged_in:
            return

        _LOG.debug("Connecting to Smart Bridge via SSL")
        connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # require a certificate from the server
        self._ssl_sock = ssl.wrap_socket(connection,
                                         keyfile=self._keyfile,
                                         certfile=self._certfile,
                                         ca_certs=self._ca_certs,
                                         cert_reqs=ssl.CERT_NONE,
                                         ssl_version=ssl.PROTOCOL_TLSv1_2)

        self._ssl_sock.connect((self._hostname, 8081))
        _LOG.debug("Successfully connected to Smart Bridge.")
        self.logged_in = True

    def _load_devices(self):
        """Load the device list from the SSL LEAP server interface."""
        _LOG.debug("Loading devices")
        self._ssl_sock.send(
            '{"CommuniqueType":"ReadRequest","Header":{"Url":"/device"}}\n')
        ssl_output = self._ssl_sock.recv(1)
        response = ssl_output
        while ssl_output != "\n":
            ssl_output = self._ssl_sock.recv(1)
            response += ssl_output
        _LOG.debug(response)
        device_json = json.loads(response.decode("UTF-8"))
        for device in device_json['Body']['Devices']:
            _LOG.debug(device)
            device_id = device['href'][device['href'].rfind('/') + 1:]
            device_zone = None
            if 'LocalZones' in device:
                device_zone = device['LocalZones'][0]['href']
                device_zone = device_zone[device_zone.rfind('/') + 1:]
            device_name = device['Name']
            device_type = device['DeviceType']
            self.devices[device_id] = {'device_id': device_id,
                                       'name': device_name,
                                       'type': device_type,
                                       'zone': device_zone,
                                       'current_state': -1}

    def _load_scenes(self):
        """
        Load the scenes from the Smart Bridge.

        Scenes are known as virtual buttons in the SSL LEAP interface.
        """
        _LOG.debug("Loading scenes from the Smart Bridge")
        self._ssl_sock.send(
            '{"CommuniqueType":"ReadRequest","Header":'
            '{"Url":"/virtualbutton"}}\n')
        ssl_output = self._ssl_sock.recv(1)
        response = ssl_output
        while ssl_output != "\n":
            ssl_output = self._ssl_sock.recv(1)
            response += ssl_output
        _LOG.debug(response)
        scene_json = json.loads(response.decode("UTF-8"))
        for scene in scene_json['Body']['VirtualButtons']:
            _LOG.debug(scene)
            if scene['IsProgrammed']:
                scene_id = scene['href'][scene['href'].rfind('/') + 1:]
                scene_name = scene['Name']
                self.scenes[scene_id] = {'scene_id': scene_id,
                                         'name': scene_name}

3) Create your main program to do your bidding.  I'll show you how to get started:

import smartbridge, time
# This will open a connection to your bridge. You will need to put in the static IP of your bridge in here #and you can add paths to the crt files in needed.
hub = smartbridge.Smartbridge('IP OF YOUR BRIDGE', 'caseta.key', 'caseta.crt', 'caseta-bridge.crt')
lastping = time.time()
while 1:
            # Check if the hub has been pinged in the last minute, if not go ahead and ping it
    if time.time() > lastping + 60:
print "*PING*"
lastping = time.time()
    hub.ping() # keep alive

fetch = hub.get_devices()   # Grab the whole dict of devices and states from the hub
for item in fetch:
                print fetch[item]   # this will print out the dict of each known device.
                print "Name: %s   Brightness: %s" % (fetch[item]['name'],fetch[item]['current_state']) # this
                   # will print out the name and brightness (0-100) of each device in the list
                hub.set_value(item, 100)  # This will set each item to 100% brightness
                # "Item" in this case is the DID or Device ID, and the 100 above is the brightness level of                          #that  device.  You can  easily check for a specific device 'name' and only alter that device.
                # The Smartbridge module supports other functions too but there doesn't seem to be                                  # documentation on them and I think most use cases don't need them.   This should be                            #enough to get most people started fetching real-time status data and controlling lights, if                        #you want to dig into the code above to see what else it can do be my guest!

4) That should be enough to get you started, if you found this info useful please leave a comment and let me know.

Huge thanks to gurumitts & mdonoughe for their time and effort in figuring this out even though his readme is crap :P

Important Links:
https://github.com/mdonoughe/pylutron-caseta              Github Page for Pylutron_caseta
https://home-assistant.io/components/lutron_caseta/      Home Assistant Lutron Caseta module page


No comments:

Post a Comment