Skip to content

Commit

Permalink
Almost have a working mock :)
Browse files Browse the repository at this point in the history
  • Loading branch information
andylockran committed Feb 5, 2024
1 parent d6022e5 commit 1300191
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 56 deletions.
2 changes: 1 addition & 1 deletion heatmiserv3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class HeatmiserUH1(object):
"""
Represents the UH1 interface that holds the serial
connection, and can have multiple thermostats
connection, and can manage up to 32 devices.
"""

def __init__(self, serialport):
Expand Down
51 changes: 26 additions & 25 deletions heatmiserv3/heatmiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,18 @@ def _hm_read_address(self):
logging.info("Sending read frame")
response = self._hm_send_address(self.address, 0, 0, 0)
logging.debug(response)
lookup = self.config["keys"]
offset = self.config["offset"]
keydata = {}
for i in lookup:
try:
kdata = lookup[i]
ddata = response[i + offset]
keydata[i] = {"label": kdata, "value": ddata}
except IndexError:
logging.info("Finished processing at %d", i)
return keydata
return response
# lookup = self.config["keys"]
# offset = self.config["offset"]
# keydata = {}
# for i in lookup:
# try:
# kdata = lookup[i]
# ddata = response[i + offset]
# keydata[i] = {"label": kdata, "value": ddata}
# except IndexError:
# logging.info("Finished processing at %d", i)
# return keydata

def read_dcb(self):
"""
Expand All @@ -257,38 +258,38 @@ def get_frost_temp(self):
"""
Returns the temperature
"""
return self._hm_read_address()[17]["value"]
return self.dcb[17]

def get_target_temp(self):
"""
Returns the temperature
"""
return self._hm_read_address()[18]["value"]
return self.dcb[18]

def get_floormax_temp(self):
"""
Returns the temperature
"""
return self._hm_read_address()[19]["value"]
return self.dcb[19]

def get_status(self):
return self._hm_read_address()[21]["value"]
return self.dcb[21]

def get_heating(self):
return self._hm_read_address()[23]["value"]
return self.dcb[23]

def get_thermostat_id(self):
return self.dcb[11]["value"]
return self.dcb[11]

def get_temperature_format(self):
temp_format = self.dcb[5]["value"]
temp_format = self.dcb[5]
if temp_format == 00:
return "C"
else:
return "F"

def get_sensor_selection(self):
sensor = self.dcb[13]["value"]
sensor = self.dcb[13]
answers = {
0: "Built in air sensor",
1: "Remote air sensor",
Expand All @@ -299,7 +300,7 @@ def get_sensor_selection(self):
return answers[sensor]

def get_program_mode(self):
mode = self.dcb[16]["value"]
mode = self.dcb[16]
modes = {0: "5/2 mode", 1: "7 day mode"}
return modes[mode]

Expand All @@ -308,16 +309,16 @@ def get_frost_protection(self):

def get_floor_temp(self):
return (
(self.dcb[31]["value"] / 10)
if (int(self.dcb[13]["value"]) > 1)
else (self.dcb[33]["value"] / 10)
(self.dcb[31] / 10)
if (int(self.dcb[13]) > 1)
else (self.dcb[33] / 10)
)

def get_sensor_error(self):
return self.dcb[34]["value"]
return self.dcb[34]

def get_current_state(self):
return self.dcb[35]["value"]
return self.dcb[35]

def set_frost_protect_mode(self, onoff):
self._hm_send_address(self.address, 23, onoff, 1)
Expand Down
103 changes: 78 additions & 25 deletions tests/serial_stubs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

from heatmiserv3 import crc16, constants, connection
from heatmiserv3 import crc16, constants, connection, heatmiser
import serial
from mock_serial import MockSerial
from mock import patch
import logging, sys


Expand All @@ -11,28 +12,13 @@
format="%(levelname)s - %(message)s"
)


MockSerialThermostat = MockSerial()
MockSerialThermostat.open()
device = MockSerialThermostat
serialport = serial.Serial(device.port)
# self.serialport = serial.serial_for_url("socket://" + ipaddress + ":" + port)
### Serial Connection Settings
serialport.baudrate = constants.COM_BAUD
serialport.bytesize = constants.COM_SIZE
serialport.parity = constants.COM_PARITY
serialport.stopbits = constants.COM_STOP
serialport.timeout = constants.COM_TIMEOUT



def interpret_message_generate_response(message):
def interpret_message_generate_response(message, dcb=bytearray([0,64,0,15,2,0,2,1,0,0,0,1,0,0,0,20,0,15,21,28,1,1,0,0,0,0,0,0,255,255,255,255,0,201,0,0,4,16,22,18,24,0,19,24,0,5,24,0,19,24,0,5,5,30,23,22,0,20,24,0,16,24,0,16])):
"""
Calculates the CRC of the message
Calculates the CRC of the response
"""
logging.info("Processing mock message.")
logging.info("Processing mock response.")
crc = crc16.CRC16()
logging.debug(f"Generating response to ${message}")
logging.debug(f"Generating response to {message}")
response=bytearray(73)
logging.debug(type(response))
response[0] = list(message)[2] # Destination address
Expand All @@ -45,7 +31,7 @@ def interpret_message_generate_response(message):
response[7] = 64 ## Action number of bytes read (low 8 bit)
response[8] = 0 ## Action number of bytes read (high 8 bit)
logging.debug("Bytearray response is: %s", response)
response[9:73] = bytearray([0,64,0,15,2,0,2,1,0,0,0,1,0,0,0,20,0,15,21,28,1,1,0,0,0,0,0,0,255,255,255,255,0,201,0,0,4,16,22,18,24,0,19,24,0,5,24,0,19,24,0,5,5,30,23,22,0,20,24,0,16,24,0,16]) ## Contents
response[9:73] = dcb
data = list(response)
data = data + crc.run(data)
checksum = data[len(data) - 2:]
Expand All @@ -56,24 +42,91 @@ def interpret_message_generate_response(message):
logging.debug("Bytearray response is: %s", data)
return bytearray(data)

class MockHeatmiserPRT(heatmiser.HeatmiserThermostatPRT):
def __init__(self, address, uh1):
super(MockHeatmiserPRT, self).__init__(address, uh1)
self.dcb = bytearray([0,64,0,15,2,0,2,1,0,0,0,1,0,0,0,20,0,15,21,28,1,1,0,0,0,0,0,0,255,255,255,255,0,201,0,0,4,16,22,18,24,0,19,24,0,5,24,0,19,24,0,5,5,30,23,22,0,20,24,0,16,24,0,16])

def read_dcb(self):
return self.dcb

def generate_response(self, message):
dcb = self.dcb
return interpret_message_generate_response(message, dcb)

def generate_reply(self, message):
logging.debug(f"Attempting to write {message}")
crc = crc16.CRC16()
logging.debug("Testing logging")
response = bytearray(5)
response[0] = list(message)[2] # Destination address
response[1] = 7 if list(message)[1] > 10 else 75 ## Low 8 bit
response[2] = 0 ## High 8 bit
response[3] = list(message)[0] ## Source address
response[4] = 1 ## Write
logging.debug(f"Printing response {response}")
data = list(response)
data = data + crc.run(data)
logging.debug(f"Writing response to write request {data}")
return bytearray(data)

def process_message(self, message):
logging.info("Processing mock message.")
logging.debug(f"Processing {message}")
data = list(message)
if data[3] == 1:
logging.debug("Processing write message")
return self.generate_reply(message)
elif data[3] == 0:
logging.debug("Processing read messages")
return self.generate_response(message)
else:
logging.debug(data[3])
logging.error("Cannot process message")
exit(1)


MockSerialPort = MockSerial()
MockSerialPort.open()
device = MockSerialPort
serialport = serial.Serial(device.port)
# self.serialport = serial.serial_for_url("socket://" + ipaddress + ":" + port)
### Serial Connection Settings
serialport.baudrate = constants.COM_BAUD
serialport.bytesize = constants.COM_SIZE
serialport.parity = constants.COM_PARITY
serialport.stopbits = constants.COM_STOP
serialport.timeout = constants.COM_TIMEOUT


MockUH1 = connection.HeatmiserUH1(serialport)
thermo1 = MockHeatmiserPRT(1, MockUH1)

response = thermo1.process_message(b'\x01\n\x81\x00\x00\x00\xff\xff,\t')
print(response)

write_temp = thermo1.process_message(b'\x01\x0b\x81\x01\x12\x00\x01\x00\x16\xd8v')

## Mock Thermostat 1
MockSerialThermostat.stub(
MockSerialPort.stub(
receive_bytes=b'\x01\n\x81\x00\x00\x00\xff\xff,\t',
send_bytes=interpret_message_generate_response(b'\x01\n\x81\x00\x00\x00\xff\xff,\t')
# send_bytes=calculate_crc_in_bytearray(b'\x01\n\x81\x00\x00\x00\xff\xff,\t')
)

## Mock Thermostat 2
MockSerialThermostat.stub(
MockSerialPort.stub(
receive_bytes=b'\x02\n\x81\x00\x00\x00\xff\xffY\xc1',
send_bytes=interpret_message_generate_response(b'\x02\n\x81\x00\x00\x00\xff\xffY\xc1')
)

# Mock Thermostat 3
MockSerialThermostat.stub(
MockSerialPort.stub(
receive_bytes=b'\x03\n\x81\x00\x00\x00\xff\xff\x8a\x86',
send_bytes=interpret_message_generate_response(b'\x03\n\x81\x00\x00\x00\xff\xff,\t,\xd4\x8d')
)

MockUH1 = connection.HeatmiserUH1(serialport)
MockSerialPort.stub(
receive_bytes=b'\x01\x0b\x81\x01\x12\x00\x01\x00\x16\xd8v',
send_bytes=write_temp
)
3 changes: 1 addition & 2 deletions tests/test_heatmiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def setUp(self):
def test_thermo1_temperature(self):
""" Initialises the thermo1 thermostat, and checks the temperature is at 21*C"""
thermo1 = heatmiser.HeatmiserThermostat(1, self.uh1)
assert thermo1.dcb[18]['label'] == 'Set room temp'
assert thermo1.dcb[18]['value'] == 21
assert type(thermo1.dcb) == list

def tearDown(self):
pass
11 changes: 8 additions & 3 deletions tests/test_heatmiser_prt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging, sys
from heatmiserv3.formats import message

from .serial_stubs import MockUH1
from .serial_stubs import MockUH1, MockHeatmiserPRT

logging.basicConfig(
stream=sys.stdout,
Expand All @@ -23,11 +23,16 @@ class TestHeatmiserThermostatMethods(unittest.TestCase):
def setUp(self):
# @TODO - Setup the mock interface for serial to write the tests.
self.uh1 = MockUH1
self.thermo1 = MockHeatmiserPRT(1, self.uh1)

def test_thermo1_temperature(self):
""" Initialises the thermo1 thermostat, and checks the temperature is at 21*C"""
thermo1 = heatmiser.HeatmiserThermostatPRT(1, self.uh1)
assert thermo1.get_target_temp() == 21
assert self.thermo1.get_target_temp() == 21

def xtest_thermo1_temperature(self):
""" Initialises the thermo1 thermostat, and checks the temperature is at 21*C"""
# self.thermo1.set_target_temp(22)
# assert self.thermo1.get_target_temp() == 22

def tearDown(self):
pass
39 changes: 39 additions & 0 deletions tests/test_stubs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Tests for HeatmiserThermostat and CRC Methods"""
import unittest
from heatmiserv3 import heatmiser
from heatmiserv3 import connection
from heatmiserv3 import crc16
import logging, sys
from heatmiserv3.formats import message

from .serial_stubs import MockUH1, MockHeatmiserPRT

logging.basicConfig(
stream=sys.stdout,
level=logging.DEBUG,
format="%(levelname)s - %(message)s"
)


class TestStubs(unittest.TestCase):
"""
This test case tests the PRT Thermostat in 5/2 mode, where there are 64 bytes of information
"""

def setUp(self):
# @TODO - Setup the mock interface for serial to write the tests.
self.uh1 = MockUH1
self.thermo1 = MockHeatmiserPRT(1, self.uh1)

def test_thermo1_get_target_temperature(self):
""" Initialises the thermo1 thermostat, and checks the temperature is at 21*C"""
assert self.thermo1.get_target_temp() == 21

# def test_thermo1_set_target_temperature(self):
# """ Initialises the thermo1 thermostat, and checks the temperature is at 21*C"""
# self.thermo1.set_target_temp(22)
# assert self.thermo1.get_target_temp() == 22


def tearDown(self):
pass

0 comments on commit 1300191

Please sign in to comment.