Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions api/src/opentrons/protocol_engine/execution/labware_movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
"""Initialize a LabwareMovementHandler instance."""
self._hardware_api = hardware_api
self._state_store = state_store
self._equipment = equipment
self._thermocycler_plate_lifter = (
thermocycler_plate_lifter
or ThermocyclerPlateLifter(
Expand All @@ -72,7 +73,13 @@ def __init__(
self._tc_movement_flagger = (
thermocycler_movement_flagger
or ThermocyclerMovementFlagger(
state_store=self._state_store, hardware_api=self._hardware_api
state_store=self._state_store,
hardware_api=self._hardware_api,
equipment=self._equipment
or EquipmentHandler(
hardware_api=self._hardware_api,
state_store=self._state_store,
),
Comment thread
caila-marashaj marked this conversation as resolved.
)
)
self._hs_movement_flagger = (
Expand Down Expand Up @@ -264,7 +271,7 @@ async def ensure_movement_not_obstructed_by_module(
)
for parent in (current_parent, new_location):
try:
await self._tc_movement_flagger.raise_if_labware_in_non_open_thermocycler(
await self._tc_movement_flagger.ensure_labware_in_open_thermocycler(
labware_parent=parent
)
await self._hs_movement_flagger.raise_if_labware_latched_on_heater_shaker(
Expand Down
13 changes: 10 additions & 3 deletions api/src/opentrons/protocol_engine/execution/movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .heater_shaker_movement_flagger import HeaterShakerMovementFlagger

from .gantry_mover import GantryMover
from .equipment import EquipmentHandler


log = logging.getLogger(__name__)
Expand All @@ -43,14 +44,21 @@ def __init__(
model_utils: Optional[ModelUtils] = None,
thermocycler_movement_flagger: Optional[ThermocyclerMovementFlagger] = None,
heater_shaker_movement_flagger: Optional[HeaterShakerMovementFlagger] = None,
equipment: Optional[EquipmentHandler] = None,
) -> None:
"""Initialize a MovementHandler instance."""
self._state_store = state_store
self._model_utils = model_utils or ModelUtils()
self._tc_movement_flagger = (
thermocycler_movement_flagger
or ThermocyclerMovementFlagger(
state_store=self._state_store, hardware_api=hardware_api
state_store=self._state_store,
hardware_api=hardware_api,
equipment=equipment
or EquipmentHandler(
hardware_api=hardware_api,
state_store=state_store,
),
)
)
self._hs_movement_flagger = (
Expand Down Expand Up @@ -83,8 +91,7 @@ async def move_to_well(
self._state_store.labware.raise_if_labware_has_labware_on_top(
labware_id=labware_id
)

await self._tc_movement_flagger.raise_if_labware_in_non_open_thermocycler(
await self._tc_movement_flagger.ensure_labware_in_open_thermocycler(
labware_parent=self._state_store.labware.get_location(labware_id=labware_id)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Helpers for flagging unsafe movements to a Thermocycler Module."""

from typing import Optional

from opentrons.drivers.types import ThermocyclerLidStatus
from opentrons.hardware_control import HardwareControlAPI
from opentrons.hardware_control.modules import Thermocycler as HardwareThermocycler


from opentrons.protocol_engine.state.module_substates import ThermocyclerModuleId
from ..types import ModuleLocation, LabwareLocation
from ..state.state import StateStore
from ..errors import ThermocyclerNotOpenError, WrongModuleTypeError

from .equipment import EquipmentHandler


class ThermocyclerMovementFlagger:
"""A helper for flagging unsafe movements to a Thermocycler Module.
Expand All @@ -19,7 +22,10 @@ class ThermocyclerMovementFlagger:
"""

def __init__(
self, state_store: StateStore, hardware_api: HardwareControlAPI
self,
state_store: StateStore,
hardware_api: HardwareControlAPI,
equipment: EquipmentHandler,
) -> None:
"""Initialize the ThermocyclerMovementFlagger.

Expand All @@ -28,18 +34,59 @@ def __init__(
which Thermocycler a labware is in, if any.
hardware_api: The underlying hardware interface. Used to query
Thermocyclers' current lid states.
equipment: The protocol engine interface to move a present thermocycler to an
operable state if need be.
"""
self._state_store = state_store
self._hardware_api = hardware_api
self._equipment = equipment

async def _verify_tc_lid_status(self, module_id: str) -> None:
"""Ensure the thermocycler's lid state is correct or raise an error."""
try:
hw_tc_lid_status = await self._get_hardware_thermocycler_lid_status(
module_id=module_id
)
except self._HardwareThermocyclerMissingError as e:
raise ThermocyclerNotOpenError(
"Thermocycler must be open when moving to labware inside it,"
" but can't confirm Thermocycler's current status."
) from e

if (
hw_tc_lid_status == ThermocyclerLidStatus.IN_BETWEEN
or hw_tc_lid_status == ThermocyclerLidStatus.UNKNOWN
):
# NOTE(cm): due to a potential hardware bug, the thermocycler might lose its position
# status when idling in an open position and default to UNKNOWN or IN_BETWEEN.
# This tries to recover only from an unexpected known position.
await self._open_tc_lid(module_id=module_id)
hw_tc_lid_status = await self._get_hardware_thermocycler_lid_status(
module_id=module_id
)
if hw_tc_lid_status != ThermocyclerLidStatus.OPEN:
Comment thread
caila-marashaj marked this conversation as resolved.
raise ThermocyclerNotOpenError(
f"Thermocycler must be open when moving to labware inside it,"
f' but Thermocycler is currently "{hw_tc_lid_status}".'
)

async def _open_tc_lid(self, module_id: str) -> None:
"""Try to open the thermocycler lid."""
tc_hardware = self._equipment.get_module_hardware_api(
ThermocyclerModuleId(module_id)
)
if tc_hardware:
await tc_hardware.open()

async def raise_if_labware_in_non_open_thermocycler(
async def ensure_labware_in_open_thermocycler(
self, labware_parent: LabwareLocation
) -> None:
"""Flag unsafe movements to a Thermocycler.

If the given labware is in a Thermocycler, and that Thermocycler's lid isn't
currently open according the engine's thermocycler state as well as
the hardware API (for non-virtual modules), raises ThermocyclerNotOpenError.
the hardware API (for non-virtual modules), tries to open the thermocycler lid.
If this is unsuccessful, raises ThermocyclerNotOpenError.
If it is a virtual module, checks only for thermocycler lid state in engine.

Otherwise, no-ops.
Expand Down Expand Up @@ -81,21 +128,7 @@ async def raise_if_labware_in_non_open_thermocycler(
# There is a chance that the engine might not have the latest lid status;
# do a hardware state check just to be sure that the lid is truly open.
if not self._state_store.config.use_virtual_modules:
try:
hw_tc_lid_status = await self._get_hardware_thermocycler_lid_status(
module_id=tc_substate.module_id
)
except self._HardwareThermocyclerMissingError as e:
raise ThermocyclerNotOpenError(
"Thermocycler must be open when moving to labware inside it,"
" but can't confirm Thermocycler's current status."
) from e

if hw_tc_lid_status != ThermocyclerLidStatus.OPEN:
raise ThermocyclerNotOpenError(
f"Thermocycler must be open when moving to labware inside it,"
f' but Thermocycler is currently "{hw_tc_lid_status}".'
)
await self._verify_tc_lid_status(module_id=module_id)

async def _get_hardware_thermocycler_lid_status(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ async def test_ensure_movement_obstructed_by_thermocycler_raises(
state_store.labware.get_parent_location(labware_id="labware-id")
).then_return(from_loc)
decoy.when(
await thermocycler_movement_flagger.raise_if_labware_in_non_open_thermocycler(
await thermocycler_movement_flagger.ensure_labware_in_open_thermocycler(
labware_parent=ModuleLocation(moduleId="a-thermocycler-id")
)
).then_raise(ThermocyclerNotOpenError("Thou shall not pass!"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from opentrons.protocol_engine.state.motion import PipetteLocationData
from opentrons.protocol_engine.execution.movement import MovementHandler
from opentrons.protocol_engine.execution.equipment import EquipmentHandler
from opentrons.protocol_engine.execution.thermocycler_movement_flagger import (
ThermocyclerMovementFlagger,
)
Expand Down Expand Up @@ -64,13 +65,20 @@ def mock_gantry_mover(decoy: Decoy) -> GantryMover:
return decoy.mock(cls=GantryMover)


@pytest.fixture
def mock_equipment_handler(decoy: Decoy) -> EquipmentHandler:
"""Get a mock in the shape of an EquipmentHandler."""
return decoy.mock(cls=EquipmentHandler)


@pytest.fixture
def subject(
state_store: StateStore,
hardware_api: HardwareAPI,
thermocycler_movement_flagger: ThermocyclerMovementFlagger,
heater_shaker_movement_flagger: HeaterShakerMovementFlagger,
mock_gantry_mover: GantryMover,
mock_equipment_handler: EquipmentHandler,
) -> MovementHandler:
"""Create a MovementHandler with its dependencies mocked out."""
return MovementHandler(
Expand All @@ -79,6 +87,7 @@ def subject(
thermocycler_movement_flagger=thermocycler_movement_flagger,
heater_shaker_movement_flagger=heater_shaker_movement_flagger,
gantry_mover=mock_gantry_mover,
equipment=mock_equipment_handler,
)


Expand Down Expand Up @@ -179,7 +188,7 @@ async def test_move_to_well(
assert result == Point(x=4, y=5, z=6)

decoy.verify(
await thermocycler_movement_flagger.raise_if_labware_in_non_open_thermocycler(
await thermocycler_movement_flagger.ensure_labware_in_open_thermocycler(
labware_parent=DeckSlotLocation(slotName=DeckSlotName.SLOT_1)
),
heater_shaker_movement_flagger.raise_if_movement_restricted(
Expand Down Expand Up @@ -287,7 +296,7 @@ async def test_move_to_well_from_starting_location(
assert result == Point(4, 5, 6)

decoy.verify(
await thermocycler_movement_flagger.raise_if_labware_in_non_open_thermocycler(
await thermocycler_movement_flagger.ensure_labware_in_open_thermocycler(
labware_parent=DeckSlotLocation(slotName=DeckSlotName.SLOT_1)
),
heater_shaker_movement_flagger.raise_if_movement_restricted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from opentrons.protocol_engine.execution.thermocycler_movement_flagger import (
ThermocyclerMovementFlagger,
)
from opentrons.protocol_engine.execution.equipment import EquipmentHandler


@pytest.fixture
Expand All @@ -42,15 +43,21 @@ def state_store(decoy: Decoy) -> StateStore:
return decoy.mock(cls=StateStore)


@pytest.fixture
def equipment_handler(decoy: Decoy) -> EquipmentHandler:
"""Get a mock in the shape of an EquipmentHandler."""
return decoy.mock(cls=EquipmentHandler)


@pytest.fixture
def subject(
state_store: StateStore,
hardware_api: HardwareAPI,
equipment_handler: EquipmentHandler,
) -> ThermocyclerMovementFlagger:
"""Return a movement flagger initialized with mocked-out dependencies."""
return ThermocyclerMovementFlagger(
state_store=state_store,
hardware_api=hardware_api,
state_store=state_store, hardware_api=hardware_api, equipment=equipment_handler
)


Expand All @@ -72,7 +79,7 @@ async def test_raises_depending_on_thermocycler_substate_lid_status(
)

with pytest.raises(ThermocyclerNotOpenError):
await subject.raise_if_labware_in_non_open_thermocycler(
await subject.ensure_labware_in_open_thermocycler(
labware_parent=ModuleLocation(moduleId="module-id"),
)

Expand Down Expand Up @@ -140,9 +147,11 @@ async def test_raises_depending_on_thermocycler_hardware_lid_status(
decoy.when(thermocycler.device_info).then_return({"serial": "module-serial"})
decoy.when(thermocycler.lid_status).then_return(lid_status)
decoy.when(hardware_api.attached_modules).then_return([thermocycler])

decoy.when(
subject._equipment.get_module_hardware_api(ThermocyclerModuleId("module-id"))
).then_return(thermocycler)
with expected_raise_cm:
await subject.raise_if_labware_in_non_open_thermocycler(
await subject.ensure_labware_in_open_thermocycler(
labware_parent=ModuleLocation(moduleId="module-id"),
)

Expand Down Expand Up @@ -172,7 +181,7 @@ async def test_raises_if_hardware_module_has_gone_missing(
decoy.when(hardware_api.attached_modules).then_return([])

with pytest.raises(ThermocyclerNotOpenError):
await subject.raise_if_labware_in_non_open_thermocycler(
await subject.ensure_labware_in_open_thermocycler(
labware_parent=ModuleLocation(moduleId="module-id"),
)

Expand All @@ -194,7 +203,7 @@ async def test_passes_if_virtual_module_lid_open(
),
)
decoy.when(state_store.config.use_virtual_modules).then_return(True)
await subject.raise_if_labware_in_non_open_thermocycler(
await subject.ensure_labware_in_open_thermocycler(
labware_parent=ModuleLocation(moduleId="module-id")
)

Expand All @@ -209,7 +218,7 @@ async def test_passes_if_labware_on_non_thermocycler_module(
decoy.when(
state_store.modules.get_thermocycler_module_substate(module_id="module-id")
).then_raise(WrongModuleTypeError("Woops"))
await subject.raise_if_labware_in_non_open_thermocycler(
await subject.ensure_labware_in_open_thermocycler(
ModuleLocation(moduleId="module-id")
)

Expand All @@ -221,6 +230,6 @@ async def test_passes_if_labware_not_on_any_module(
decoy: Decoy,
) -> None:
"""It shouldn't raise if the labware isn't on a module."""
await subject.raise_if_labware_in_non_open_thermocycler(
await subject.ensure_labware_in_open_thermocycler(
DeckSlotLocation(slotName=DeckSlotName.SLOT_1)
)
Loading