diff --git a/api/tests/opentrons/conftest.py b/api/tests/opentrons/conftest.py index 0f0a4047c3ec..f5d32781b05a 100755 --- a/api/tests/opentrons/conftest.py +++ b/api/tests/opentrons/conftest.py @@ -170,6 +170,8 @@ async def _build_ot2_hw() -> AsyncIterator[ThreadManager[HardwareControlAPI]]: yield hw_sim finally: config.robot_configs.clear() + for m in hw_sim.attached_modules: + await m.cleanup() hw_sim.set_config(old_config) hw_sim.clean_up() @@ -189,6 +191,8 @@ async def _build_ot3_hw() -> AsyncIterator[ThreadManager[HardwareControlAPI]]: yield hw_sim finally: config.robot_configs.clear() + for m in hw_sim.attached_modules: + await m.cleanup() hw_sim.set_config(old_config) hw_sim.clean_up() @@ -233,11 +237,15 @@ async def hardware(request, virtual_smoothie_env): # Async because ProtocolContext.__init__() needs an event loop, # so this fixture needs to run in an event loop. @pytest.fixture -async def ctx(hardware) -> ProtocolContext: - return ProtocolContext( +async def ctx(hardware) -> AsyncIterator[ProtocolContext]: + c = ProtocolContext( implementation=ProtocolContextImplementation(sync_hardware=hardware.sync), loop=asyncio.get_running_loop(), ) + yield c + # Manually clean up all the modules. + for m in c.loaded_modules.items(): + m[1]._module.cleanup() @pytest.fixture diff --git a/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py b/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py index c8eb805cbaea..ae80a173ad79 100644 --- a/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py +++ b/api/tests/opentrons/hardware_control/modules/test_hc_tempdeck.py @@ -18,7 +18,9 @@ def usb_port(): ) -async def test_sim_initialization(usb_port): +@pytest.fixture +async def subject(usb_port: USBPort) -> modules.AbstractModule: + """Test subject""" temp = await modules.build( port="/dev/ot_module_sim_tempdeck0", usb_port=usb_port, @@ -27,78 +29,57 @@ async def test_sim_initialization(usb_port): loop=asyncio.get_running_loop(), execution_manager=ExecutionManager(), ) - assert isinstance(temp, modules.AbstractModule) + yield temp + await temp.cleanup() -async def test_sim_state(usb_port): - temp = await modules.TempDeck.build( - port="/dev/ot_module_sim_tempdeck0", - usb_port=usb_port, - simulating=True, - interrupt_callback=lambda x: None, - loop=asyncio.get_running_loop(), - execution_manager=ExecutionManager(), - ) - await temp.wait_next_poll() - assert temp.temperature == 0 - assert temp.target is None - assert temp.status == "idle" - assert temp.live_data["status"] == temp.status - assert temp.live_data["data"]["currentTemp"] == temp.temperature - assert temp.live_data["data"]["targetTemp"] == temp.target - status = temp.device_info +async def test_sim_initialization(subject: modules.AbstractModule): + assert isinstance(subject, modules.AbstractModule) + + +async def test_sim_state(subject: modules.AbstractModule): + await subject.wait_next_poll() + assert subject.temperature == 0 + assert subject.target is None + assert subject.status == "idle" + assert subject.live_data["status"] == subject.status + assert subject.live_data["data"]["currentTemp"] == subject.temperature + assert subject.live_data["data"]["targetTemp"] == subject.target + status = subject.device_info assert status["serial"] == "dummySerialTD" # return v1 if sim_model is not passed assert status["model"] == "temp_deck_v1.1" assert status["version"] == "dummyVersionTD" -async def test_sim_update(usb_port): - temp = await modules.TempDeck.build( - port="/dev/ot_module_sim_tempdeck0", - usb_port=usb_port, - simulating=True, - interrupt_callback=lambda x: None, - loop=asyncio.get_running_loop(), - execution_manager=ExecutionManager(), - polling_frequency=0, - ) - await temp.set_temperature(10) - assert temp.temperature == 10 - assert temp.target == 10 - assert temp.status == "holding at target" - await temp.deactivate() - await temp.wait_next_poll() - assert temp.temperature == 23 - assert temp.target is None - assert temp.status == "idle" - - -async def test_revision_model_parsing(usb_port): - mag = await modules.TempDeck.build( - port="", - simulating=True, - usb_port=usb_port, - interrupt_callback=lambda x: None, - loop=asyncio.get_running_loop(), - execution_manager=ExecutionManager(), - polling_frequency=0, - ) - mag._device_info["model"] = "temp_deck_v20" - assert mag.model() == "temperatureModuleV2" - mag._device_info["model"] = "temp_deck_v4.0" - assert mag.model() == "temperatureModuleV1" - del mag._device_info["model"] - assert mag.model() == "temperatureModuleV1" - mag._device_info["model"] = "temp_deck_v1.1" - assert mag.model() == "temperatureModuleV1" +async def test_sim_update(subject: modules.AbstractModule): + await subject.set_temperature(10) + assert subject.temperature == 10 + assert subject.target == 10 + assert subject.status == "holding at target" + await subject.deactivate() + await subject.wait_next_poll() + assert subject.temperature == 23 + assert subject.target is None + assert subject.status == "idle" -async def test_poll_error(usb_port) -> None: +async def test_revision_model_parsing(subject: modules.AbstractModule): + subject._device_info["model"] = "temp_deck_v20" + assert subject.model() == "temperatureModuleV2" + subject._device_info["model"] = "temp_deck_v4.0" + assert subject.model() == "temperatureModuleV1" + del subject._device_info["model"] + assert subject.model() == "temperatureModuleV1" + subject._device_info["model"] = "temp_deck_v1.1" + assert subject.model() == "temperatureModuleV1" + + +async def test_poll_error(usb_port: USBPort) -> None: mock_driver = AsyncMock(spec=AbstractTempDeckDriver) mock_driver.get_temperature.side_effect = ValueError("hello!") - magdeck = modules.TempDeck( + tempdeck = modules.TempDeck( port="", usb_port=usb_port, execution_manager=AsyncMock(spec=ExecutionManager), @@ -108,4 +89,6 @@ async def test_poll_error(usb_port) -> None: polling_frequency=1, ) with pytest.raises(ValueError, match="hello!"): - await magdeck.wait_next_poll() + await tempdeck.wait_next_poll() + + await tempdeck.cleanup() diff --git a/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py b/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py index 090d2a1d1940..c7b31d5f2861 100644 --- a/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py +++ b/api/tests/opentrons/hardware_control/modules/test_hc_thermocycler.py @@ -18,7 +18,9 @@ def usb_port() -> USBPort: ) -async def test_sim_initialization(usb_port): +@pytest.fixture +async def subject(usb_port: USBPort) -> modules.Thermocycler: + """Test subject""" therm = await modules.build( port="/dev/ot_module_sim_thermocycler0", usb_port=usb_port, @@ -27,105 +29,82 @@ async def test_sim_initialization(usb_port): loop=asyncio.get_running_loop(), execution_manager=ExecutionManager(), ) + yield therm + await therm.cleanup() - assert isinstance(therm, modules.AbstractModule) +async def test_sim_initialization(subject: modules.Thermocycler): + assert isinstance(subject, modules.AbstractModule) -async def test_lid(usb_port): - therm = await modules.build( - port="/dev/ot_module_sim_thermocycler0", - usb_port=usb_port, - which="thermocycler", - simulating=True, - loop=asyncio.get_running_loop(), - execution_manager=ExecutionManager(), - ) - await therm.open() - await therm.wait_next_poll() - assert therm.lid_status == "open" +async def test_lid(subject: modules.Thermocycler): + await subject.open() + await subject.wait_next_poll() + assert subject.lid_status == "open" - await therm.close() - await therm.wait_next_poll() - assert therm.lid_status == "closed" + await subject.close() + await subject.wait_next_poll() + assert subject.lid_status == "closed" - await therm.close() - await therm.wait_next_poll() - assert therm.lid_status == "closed" + await subject.close() + await subject.wait_next_poll() + assert subject.lid_status == "closed" - await therm.open() - await therm.wait_next_poll() - assert therm.lid_status == "open" + await subject.open() + await subject.wait_next_poll() + assert subject.lid_status == "open" -async def test_sim_state(usb_port): - therm = await modules.build( - port="/dev/ot_module_sim_thermocycler0", - usb_port=usb_port, - which="thermocycler", - simulating=True, - loop=asyncio.get_running_loop(), - execution_manager=ExecutionManager(), - ) - - assert therm.temperature is None - assert therm.target is None - assert therm.status == "error" - assert therm.live_data["status"] == therm.status - assert therm.live_data["data"]["currentTemp"] == therm.temperature - assert therm.live_data["data"]["targetTemp"] == therm.target - status = therm.device_info +async def test_sim_state(subject: modules.Thermocycler): + assert subject.temperature == 23 + assert subject.target is None + assert subject.status == "idle" + assert subject.live_data["status"] == subject.status + assert subject.live_data["data"]["currentTemp"] == subject.temperature + assert subject.live_data["data"]["targetTemp"] == subject.target + status = subject.device_info assert status["serial"] == "dummySerialTC" assert status["model"] == "dummyModelTC" assert status["version"] == "dummyVersionTC" -async def test_sim_update(usb_port): - therm = await modules.build( - port="/dev/ot_module_sim_thermocycler0", - usb_port=usb_port, - which="thermocycler", - simulating=True, - loop=asyncio.get_running_loop(), - execution_manager=ExecutionManager(), - ) - - await therm.set_temperature( +async def test_sim_update(subject: modules.Thermocycler): + await subject.set_temperature( temperature=10, hold_time_seconds=None, hold_time_minutes=None, volume=50 ) - await therm.wait_next_poll() - assert therm.temperature == 10 - assert therm.target == 10 - assert therm.status == "holding at target" - # await asyncio.wait_for(therm.wait_for_temp(), timeout=0.2) - await therm.deactivate_block() - await therm.wait_next_poll() - assert therm.temperature == 23 - assert therm.target is None - assert therm.status == "idle" - - await therm.set_lid_temperature(temperature=80) - assert therm.lid_temp == 80 - assert therm.lid_target == 80 - - await therm.deactivate_lid() - await therm.wait_next_poll() - assert therm.lid_temp == 23 - assert therm.lid_target is None - - await therm.set_temperature(temperature=10, volume=60, hold_time_seconds=2) - await therm.set_lid_temperature(temperature=70) - assert therm.temperature == 10 - assert therm.target == 10 - assert therm.lid_temp == 70 - assert therm.lid_target == 70 - await therm.deactivate() - await therm.wait_next_poll() - assert therm.temperature == 23 - assert therm.target is None - assert therm.status == "idle" - assert therm.lid_temp == 23 - assert therm.lid_target is None + await subject.wait_next_poll() + assert subject.temperature == 10 + assert subject.target == 10 + assert subject.status == "holding at target" + + await subject.deactivate_block() + await subject.wait_next_poll() + assert subject.temperature == 23 + assert subject.target is None + assert subject.status == "idle" + + await subject.set_lid_temperature(temperature=80) + assert subject.lid_temp == 80 + assert subject.lid_target == 80 + + await subject.deactivate_lid() + await subject.wait_next_poll() + assert subject.lid_temp == 23 + assert subject.lid_target is None + + await subject.set_temperature(temperature=10, volume=60, hold_time_seconds=2) + await subject.set_lid_temperature(temperature=70) + assert subject.temperature == 10 + assert subject.target == 10 + assert subject.lid_temp == 70 + assert subject.lid_target == 70 + await subject.deactivate() + await subject.wait_next_poll() + assert subject.temperature == 23 + assert subject.target is None + assert subject.status == "idle" + assert subject.lid_temp == 23 + assert subject.lid_target is None @pytest.fixture @@ -161,7 +140,8 @@ async def set_temperature_subject( device_info={}, polling_interval_sec=0.001, ) - return hw_tc + yield hw_tc + await hw_tc.cleanup() async def test_set_temperature_with_volume( diff --git a/api/tests/opentrons/hardware_control/test_execution_manager.py b/api/tests/opentrons/hardware_control/test_execution_manager.py index 4ad3c4475e0f..0b3b0f6b2f8d 100644 --- a/api/tests/opentrons/hardware_control/test_execution_manager.py +++ b/api/tests/opentrons/hardware_control/test_execution_manager.py @@ -73,3 +73,5 @@ async def fake_task(): assert len(all_tasks) == 2 # current and other assert other_task in all_tasks assert cancellable_task not in all_tasks + + other_task.cancel() diff --git a/api/tests/opentrons/hardware_control/test_modules.py b/api/tests/opentrons/hardware_control/test_modules.py index e2863d2823fe..f9cbce26ba73 100644 --- a/api/tests/opentrons/hardware_control/test_modules.py +++ b/api/tests/opentrons/hardware_control/test_modules.py @@ -21,6 +21,8 @@ async def test_get_modules_simulating(): await asyncio.sleep(0.05) from_api = api.attached_modules assert sorted([mod.name() for mod in from_api]) == sorted(mods) + for m in api.attached_modules: + await m.cleanup() async def test_module_caching(): @@ -84,24 +86,16 @@ async def test_filtering_modules(): assert len(filtered_modules) == 2 assert filtered_modules == api.attached_modules[2:4] + for m in api.attached_modules: + await m.cleanup() -async def test_module_update_integration(monkeypatch): + +@pytest.fixture +async def mod_tempdeck(): from opentrons.hardware_control import modules loop = asyncio.get_running_loop() - def async_return(result): - f = asyncio.Future() - f.set_result(result) - return f - - bootloader_kwargs = { - "stdout": asyncio.subprocess.PIPE, - "stderr": asyncio.subprocess.PIPE, - "loop": loop, - } - - # test temperature module update with avrdude bootloader usb_port = USBPort( name="", hub=None, @@ -118,26 +112,22 @@ def async_return(result): execution_manager=ExecutionManager(), sim_model="temperatureModuleV2", ) + yield tempdeck + await tempdeck.cleanup() - upload_via_avrdude_mock = mock.Mock( - return_value=(async_return((True, "avrdude bootloader worked"))) - ) - monkeypatch.setattr(modules.update, "upload_via_avrdude", upload_via_avrdude_mock) - async def mock_find_avrdude_bootloader_port(): - return "ot_module_avrdude_bootloader1" +@pytest.fixture +async def mod_magdeck(): + from opentrons.hardware_control import modules - monkeypatch.setattr( - modules.update, "find_bootloader_port", mock_find_avrdude_bootloader_port - ) + loop = asyncio.get_running_loop() - await modules.update_firmware(tempdeck, "fake_fw_file_path", loop) - upload_via_avrdude_mock.assert_called_once_with( - "ot_module_avrdude_bootloader1", "fake_fw_file_path", bootloader_kwargs + usb_port = USBPort( + name="", + hub=None, + port_number=0, + device_path="/dev/ot_module_sim_magdeck0", ) - upload_via_avrdude_mock.reset_mock() - - # test magnetic module update with avrdude bootloader magdeck = await modules.build( port="/dev/ot_module_sim_magdeck0", @@ -147,13 +137,22 @@ async def mock_find_avrdude_bootloader_port(): loop=loop, execution_manager=ExecutionManager(), ) + yield magdeck + await magdeck.cleanup() - await modules.update_firmware(magdeck, "fake_fw_file_path", loop) - upload_via_avrdude_mock.assert_called_once_with( - "ot_module_avrdude_bootloader1", "fake_fw_file_path", bootloader_kwargs - ) - # test thermocycler module update with bossa bootloader +@pytest.fixture +async def mod_thermocycler(): + from opentrons.hardware_control import modules + + loop = asyncio.get_running_loop() + + usb_port = USBPort( + name="", + hub=None, + port_number=0, + device_path="/dev/ot_module_sim_thermocycler0", + ) thermocycler = await modules.build( port="/dev/ot_module_sim_thermocycler0", @@ -163,7 +162,54 @@ async def mock_find_avrdude_bootloader_port(): loop=loop, execution_manager=ExecutionManager(), ) + yield thermocycler + await thermocycler.cleanup() + + +async def test_module_update_integration( + monkeypatch, mod_tempdeck, mod_magdeck, mod_thermocycler +): + from opentrons.hardware_control import modules + + loop = asyncio.get_running_loop() + + def async_return(result): + f = asyncio.Future() + f.set_result(result) + return f + bootloader_kwargs = { + "stdout": asyncio.subprocess.PIPE, + "stderr": asyncio.subprocess.PIPE, + "loop": loop, + } + + upload_via_avrdude_mock = mock.Mock( + return_value=(async_return((True, "avrdude bootloader worked"))) + ) + monkeypatch.setattr(modules.update, "upload_via_avrdude", upload_via_avrdude_mock) + + async def mock_find_avrdude_bootloader_port(): + return "ot_module_avrdude_bootloader1" + + monkeypatch.setattr( + modules.update, "find_bootloader_port", mock_find_avrdude_bootloader_port + ) + + # test temperature module update with avrdude bootloader + await modules.update_firmware(mod_tempdeck, "fake_fw_file_path", loop) + upload_via_avrdude_mock.assert_called_once_with( + "ot_module_avrdude_bootloader1", "fake_fw_file_path", bootloader_kwargs + ) + upload_via_avrdude_mock.reset_mock() + + # test magnetic module update with avrdude bootloader + await modules.update_firmware(mod_magdeck, "fake_fw_file_path", loop) + upload_via_avrdude_mock.assert_called_once_with( + "ot_module_avrdude_bootloader1", "fake_fw_file_path", bootloader_kwargs + ) + + # test thermocycler module update with bossa bootloader upload_via_bossa_mock = mock.Mock( return_value=(async_return((True, "bossa bootloader worked"))) ) @@ -176,7 +222,7 @@ async def mock_find_bossa_bootloader_port(): modules.update, "find_bootloader_port", mock_find_bossa_bootloader_port ) - await modules.update_firmware(thermocycler, "fake_fw_file_path", loop) + await modules.update_firmware(mod_thermocycler, "fake_fw_file_path", loop) upload_via_bossa_mock.assert_called_once_with( "ot_module_bossa_bootloader1", "fake_fw_file_path", bootloader_kwargs ) @@ -215,6 +261,8 @@ async def test_get_bundled_fw(monkeypatch, tmpdir): assert api.attached_modules[2].bundled_fw == BundledFirmware( version="0.1.2", path=dummy_tc_file ) + for m in api.attached_modules: + await m.cleanup() @pytest.mark.parametrize(