Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions api/tests/opentrons/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ async def _build_ot2_hw() -> AsyncIterator[ThreadManager[HardwareControlAPI]]:
yield hw_sim
finally:
config.robot_configs.clear()
for m in hw_sim.attached_modules:
await m.cleanup()
hw_sim.set_config(old_config)
hw_sim.clean_up()

Expand All @@ -189,6 +191,8 @@ async def _build_ot3_hw() -> AsyncIterator[ThreadManager[HardwareControlAPI]]:
yield hw_sim
finally:
config.robot_configs.clear()
for m in hw_sim.attached_modules:
await m.cleanup()
hw_sim.set_config(old_config)
hw_sim.clean_up()

Expand Down Expand Up @@ -233,11 +237,15 @@ async def hardware(request, virtual_smoothie_env):
# Async because ProtocolContext.__init__() needs an event loop,
# so this fixture needs to run in an event loop.
@pytest.fixture
async def ctx(hardware) -> ProtocolContext:
return ProtocolContext(
async def ctx(hardware) -> AsyncIterator[ProtocolContext]:
c = ProtocolContext(
implementation=ProtocolContextImplementation(sync_hardware=hardware.sync),
loop=asyncio.get_running_loop(),
)
yield c
# Manually clean up all the modules.
for m in c.loaded_modules.items():
m[1]._module.cleanup()


@pytest.fixture
Expand Down
105 changes: 44 additions & 61 deletions api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ def usb_port():
)


async def test_sim_initialization(usb_port):
@pytest.fixture
async def subject(usb_port: USBPort) -> modules.AbstractModule:
"""Test subject"""
temp = await modules.build(
port="/dev/ot_module_sim_tempdeck0",
usb_port=usb_port,
Expand All @@ -27,78 +29,57 @@ async def test_sim_initialization(usb_port):
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)
assert isinstance(temp, modules.AbstractModule)
yield temp
await temp.cleanup()


async def test_sim_state(usb_port):
temp = await modules.TempDeck.build(
port="/dev/ot_module_sim_tempdeck0",
usb_port=usb_port,
simulating=True,
interrupt_callback=lambda x: None,
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)
await temp.wait_next_poll()
assert temp.temperature == 0
assert temp.target is None
assert temp.status == "idle"
assert temp.live_data["status"] == temp.status
assert temp.live_data["data"]["currentTemp"] == temp.temperature
assert temp.live_data["data"]["targetTemp"] == temp.target
status = temp.device_info
async def test_sim_initialization(subject: modules.AbstractModule):
assert isinstance(subject, modules.AbstractModule)


async def test_sim_state(subject: modules.AbstractModule):
await subject.wait_next_poll()
assert subject.temperature == 0
assert subject.target is None
assert subject.status == "idle"
assert subject.live_data["status"] == subject.status
assert subject.live_data["data"]["currentTemp"] == subject.temperature
assert subject.live_data["data"]["targetTemp"] == subject.target
status = subject.device_info
assert status["serial"] == "dummySerialTD"
# return v1 if sim_model is not passed
assert status["model"] == "temp_deck_v1.1"
assert status["version"] == "dummyVersionTD"


async def test_sim_update(usb_port):
temp = await modules.TempDeck.build(
port="/dev/ot_module_sim_tempdeck0",
usb_port=usb_port,
simulating=True,
interrupt_callback=lambda x: None,
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
polling_frequency=0,
)
await temp.set_temperature(10)
assert temp.temperature == 10
assert temp.target == 10
assert temp.status == "holding at target"
await temp.deactivate()
await temp.wait_next_poll()
assert temp.temperature == 23
assert temp.target is None
assert temp.status == "idle"


async def test_revision_model_parsing(usb_port):
mag = await modules.TempDeck.build(
port="",
simulating=True,
usb_port=usb_port,
interrupt_callback=lambda x: None,
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
polling_frequency=0,
)
mag._device_info["model"] = "temp_deck_v20"
assert mag.model() == "temperatureModuleV2"
mag._device_info["model"] = "temp_deck_v4.0"
assert mag.model() == "temperatureModuleV1"
del mag._device_info["model"]
assert mag.model() == "temperatureModuleV1"
mag._device_info["model"] = "temp_deck_v1.1"
assert mag.model() == "temperatureModuleV1"
async def test_sim_update(subject: modules.AbstractModule):
await subject.set_temperature(10)
assert subject.temperature == 10
assert subject.target == 10
assert subject.status == "holding at target"
await subject.deactivate()
await subject.wait_next_poll()
assert subject.temperature == 23
assert subject.target is None
assert subject.status == "idle"


async def test_poll_error(usb_port) -> None:
async def test_revision_model_parsing(subject: modules.AbstractModule):
subject._device_info["model"] = "temp_deck_v20"
assert subject.model() == "temperatureModuleV2"
subject._device_info["model"] = "temp_deck_v4.0"
assert subject.model() == "temperatureModuleV1"
del subject._device_info["model"]
assert subject.model() == "temperatureModuleV1"
subject._device_info["model"] = "temp_deck_v1.1"
assert subject.model() == "temperatureModuleV1"


async def test_poll_error(usb_port: USBPort) -> None:
mock_driver = AsyncMock(spec=AbstractTempDeckDriver)
mock_driver.get_temperature.side_effect = ValueError("hello!")

magdeck = modules.TempDeck(
tempdeck = modules.TempDeck(
port="",
usb_port=usb_port,
execution_manager=AsyncMock(spec=ExecutionManager),
Expand All @@ -108,4 +89,6 @@ async def test_poll_error(usb_port) -> None:
polling_frequency=1,
)
with pytest.raises(ValueError, match="hello!"):
await magdeck.wait_next_poll()
await tempdeck.wait_next_poll()

await tempdeck.cleanup()
150 changes: 65 additions & 85 deletions api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ def usb_port() -> USBPort:
)


async def test_sim_initialization(usb_port):
@pytest.fixture
async def subject(usb_port: USBPort) -> modules.Thermocycler:
"""Test subject"""
therm = await modules.build(
port="/dev/ot_module_sim_thermocycler0",
usb_port=usb_port,
Expand All @@ -27,105 +29,82 @@ async def test_sim_initialization(usb_port):
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)
yield therm
await therm.cleanup()

assert isinstance(therm, modules.AbstractModule)

async def test_sim_initialization(subject: modules.Thermocycler):
assert isinstance(subject, modules.AbstractModule)

async def test_lid(usb_port):
therm = await modules.build(
port="/dev/ot_module_sim_thermocycler0",
usb_port=usb_port,
which="thermocycler",
simulating=True,
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)

await therm.open()
await therm.wait_next_poll()
assert therm.lid_status == "open"
async def test_lid(subject: modules.Thermocycler):
await subject.open()
await subject.wait_next_poll()
assert subject.lid_status == "open"

await therm.close()
await therm.wait_next_poll()
assert therm.lid_status == "closed"
await subject.close()
await subject.wait_next_poll()
assert subject.lid_status == "closed"

await therm.close()
await therm.wait_next_poll()
assert therm.lid_status == "closed"
await subject.close()
await subject.wait_next_poll()
assert subject.lid_status == "closed"

await therm.open()
await therm.wait_next_poll()
assert therm.lid_status == "open"
await subject.open()
await subject.wait_next_poll()
assert subject.lid_status == "open"


async def test_sim_state(usb_port):
therm = await modules.build(
port="/dev/ot_module_sim_thermocycler0",
usb_port=usb_port,
which="thermocycler",
simulating=True,
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)

assert therm.temperature is None
assert therm.target is None
assert therm.status == "error"
assert therm.live_data["status"] == therm.status
assert therm.live_data["data"]["currentTemp"] == therm.temperature
assert therm.live_data["data"]["targetTemp"] == therm.target
status = therm.device_info
async def test_sim_state(subject: modules.Thermocycler):
assert subject.temperature == 23
assert subject.target is None
assert subject.status == "idle"
assert subject.live_data["status"] == subject.status
assert subject.live_data["data"]["currentTemp"] == subject.temperature
assert subject.live_data["data"]["targetTemp"] == subject.target
status = subject.device_info
assert status["serial"] == "dummySerialTC"
assert status["model"] == "dummyModelTC"
assert status["version"] == "dummyVersionTC"


async def test_sim_update(usb_port):
therm = await modules.build(
port="/dev/ot_module_sim_thermocycler0",
usb_port=usb_port,
which="thermocycler",
simulating=True,
loop=asyncio.get_running_loop(),
execution_manager=ExecutionManager(),
)

await therm.set_temperature(
async def test_sim_update(subject: modules.Thermocycler):
await subject.set_temperature(
temperature=10, hold_time_seconds=None, hold_time_minutes=None, volume=50
)
await therm.wait_next_poll()
assert therm.temperature == 10
assert therm.target == 10
assert therm.status == "holding at target"
# await asyncio.wait_for(therm.wait_for_temp(), timeout=0.2)
await therm.deactivate_block()
await therm.wait_next_poll()
assert therm.temperature == 23
assert therm.target is None
assert therm.status == "idle"

await therm.set_lid_temperature(temperature=80)
assert therm.lid_temp == 80
assert therm.lid_target == 80

await therm.deactivate_lid()
await therm.wait_next_poll()
assert therm.lid_temp == 23
assert therm.lid_target is None

await therm.set_temperature(temperature=10, volume=60, hold_time_seconds=2)
await therm.set_lid_temperature(temperature=70)
assert therm.temperature == 10
assert therm.target == 10
assert therm.lid_temp == 70
assert therm.lid_target == 70
await therm.deactivate()
await therm.wait_next_poll()
assert therm.temperature == 23
assert therm.target is None
assert therm.status == "idle"
assert therm.lid_temp == 23
assert therm.lid_target is None
await subject.wait_next_poll()
assert subject.temperature == 10
assert subject.target == 10
assert subject.status == "holding at target"

await subject.deactivate_block()
await subject.wait_next_poll()
assert subject.temperature == 23
assert subject.target is None
assert subject.status == "idle"

await subject.set_lid_temperature(temperature=80)
assert subject.lid_temp == 80
assert subject.lid_target == 80

await subject.deactivate_lid()
await subject.wait_next_poll()
assert subject.lid_temp == 23
assert subject.lid_target is None

await subject.set_temperature(temperature=10, volume=60, hold_time_seconds=2)
await subject.set_lid_temperature(temperature=70)
assert subject.temperature == 10
assert subject.target == 10
assert subject.lid_temp == 70
assert subject.lid_target == 70
await subject.deactivate()
await subject.wait_next_poll()
assert subject.temperature == 23
assert subject.target is None
assert subject.status == "idle"
assert subject.lid_temp == 23
assert subject.lid_target is None


@pytest.fixture
Expand Down Expand Up @@ -161,7 +140,8 @@ async def set_temperature_subject(
device_info={},
polling_interval_sec=0.001,
)
return hw_tc
yield hw_tc
await hw_tc.cleanup()


async def test_set_temperature_with_volume(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,5 @@ async def fake_task():
assert len(all_tasks) == 2 # current and other
assert other_task in all_tasks
assert cancellable_task not in all_tasks

other_task.cancel()
Loading