Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 38 additions & 66 deletions api/src/opentrons/protocol_engine/commands/aspirate_while_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ..errors.exceptions import PipetteNotReadyToAspirateError
from opentrons.hardware_control import HardwareControlAPI
from ..state.update_types import CLEAR
from ..types import CurrentWell, DeckPoint
from ..types import DeckPoint

if TYPE_CHECKING:
from ..execution import PipettingHandler, GantryMover, MovementHandler
Expand Down Expand Up @@ -104,25 +104,15 @@ async def execute(self, params: AspirateWhileTrackingParams) -> _ExecuteReturn:
" The first aspirate following a blow-out must be from a specific well"
" so the plunger can be reset in a known safe position."
)

current_position = await self._gantry_mover.get_position(params.pipetteId)
current_location = self._state_view.pipettes.get_current_location()

state_update = StateUpdate()
current_well = CurrentWell(
pipette_id=params.pipetteId,
labware_id=params.labwareId,
well_name=params.wellName,
)

move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=params.pipetteId,
labware_id=params.labwareId,
well_name=params.wellName,
well_location=params.wellLocation,
current_well=current_well,
operation_volume=-params.volume,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
Expand All @@ -138,9 +128,9 @@ async def execute(self, params: AspirateWhileTrackingParams) -> _ExecuteReturn:
flow_rate=params.flowRate,
location_if_error={
"retryLocation": (
current_position.x,
current_position.y,
current_position.z,
move_result.public.position.x,
move_result.public.position.y,
move_result.public.position.z,
)
},
command_note_adder=self._command_note_adder,
Expand All @@ -156,58 +146,40 @@ async def execute(self, params: AspirateWhileTrackingParams) -> _ExecuteReturn:
z=position_after_aspirate.z,
)
if isinstance(aspirate_result, DefinedErrorData):
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
return DefinedErrorData(
public=aspirate_result.public,
state_update=aspirate_result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=aspirate_result.state_update_if_false_positive,
)
else:
return aspirate_result
else:
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
return SuccessData(
public=AspirateWhileTrackingResult(
volume=aspirate_result.public.volume,
position=result_deck_point,
),
state_update=aspirate_result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
),
)
else:
return SuccessData(
public=AspirateWhileTrackingResult(
volume=aspirate_result.public.volume,
position=result_deck_point,
return DefinedErrorData(
public=aspirate_result.public,
state_update=aspirate_result.state_update.set_liquid_operated(
labware_id=params.labwareId,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
params.labwareId,
params.wellName,
params.pipetteId,
),
state_update=aspirate_result.state_update,
)
volume_added=CLEAR,
),
state_update_if_false_positive=aspirate_result.state_update_if_false_positive,
)

return SuccessData(
public=AspirateWhileTrackingResult(
volume=aspirate_result.public.volume,
position=result_deck_point,
),
state_update=aspirate_result.state_update.set_liquid_operated(
labware_id=params.labwareId,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
params.labwareId,
params.wellName,
params.pipetteId,
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
params.labwareId,
params.wellName,
params.pipetteId,
),
),
)


class AspirateWhileTracking(
Expand Down
108 changes: 36 additions & 72 deletions api/src/opentrons/protocol_engine/commands/dispense_while_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pydantic.json_schema import SkipJsonSchema

from ..state.update_types import CLEAR, StateUpdate
from ..types import CurrentWell, DeckPoint
from ..types import DeckPoint
from .pipetting_common import (
PipetteIdMixin,
DispenseVolumeMixin,
Expand Down Expand Up @@ -99,13 +99,6 @@ async def execute(self, params: DispenseWhileTrackingParams) -> _ExecuteReturn:

# TODO(pbm, 10-15-24): call self._state_view.geometry.validate_dispense_volume_into_well()

current_location = self._state_view.pipettes.get_current_location()
current_position = await self._gantry_mover.get_position(params.pipetteId)
current_well = CurrentWell(
pipette_id=params.pipetteId,
labware_id=params.labwareId,
well_name=params.wellName,
)
state_update = StateUpdate()
move_result = await move_to_well(
movement=self._movement,
Expand All @@ -114,8 +107,6 @@ async def execute(self, params: DispenseWhileTrackingParams) -> _ExecuteReturn:
labware_id=params.labwareId,
well_name=params.wellName,
well_location=params.wellLocation,
current_well=current_well,
operation_volume=-params.volume,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
Expand All @@ -132,9 +123,9 @@ async def execute(self, params: DispenseWhileTrackingParams) -> _ExecuteReturn:
push_out=params.pushOut,
location_if_error={
"retryLocation": (
current_position.x,
current_position.y,
current_position.z,
move_result.public.position.x,
move_result.public.position.y,
move_result.public.position.z,
)
},
pipetting=self._pipetting,
Expand All @@ -150,67 +141,40 @@ async def execute(self, params: DispenseWhileTrackingParams) -> _ExecuteReturn:
)

if isinstance(dispense_result, DefinedErrorData):
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
return DefinedErrorData(
public=dispense_result.public,
state_update=dispense_result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=dispense_result.state_update_if_false_positive,
)
else:
return dispense_result
else:
if (
isinstance(current_location, CurrentWell)
and current_location.pipette_id == params.pipetteId
):
volume_added = (
self._state_view.pipettes.get_liquid_dispensed_by_ejecting_volume(
pipette_id=params.pipetteId,
volume=dispense_result.public.volume,
)
)
if volume_added is not None:
volume_added *= self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
return DefinedErrorData(
public=dispense_result.public,
state_update=dispense_result.state_update.set_liquid_operated(
labware_id=params.labwareId,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
params.labwareId,
params.wellName,
params.pipetteId,
)
return SuccessData(
public=DispenseWhileTrackingResult(
volume=dispense_result.public.volume,
position=result_deck_point,
),
state_update=dispense_result.state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=volume_added
if volume_added is not None
else CLEAR,
),
)
else:
return SuccessData(
public=DispenseWhileTrackingResult(
volume=dispense_result.public.volume,
position=result_deck_point,
),
state_update=dispense_result.state_update,
)
volume_added=CLEAR,
),
state_update_if_false_positive=dispense_result.state_update_if_false_positive,
)

return SuccessData(
public=DispenseWhileTrackingResult(
volume=dispense_result.public.volume,
position=result_deck_point,
),
state_update=dispense_result.state_update.set_liquid_operated(
labware_id=params.labwareId,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
params.labwareId,
params.wellName,
params.pipetteId,
),
volume_added=dispense_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
params.labwareId,
params.wellName,
params.pipetteId,
),
),
)


class DispenseWhileTracking(
Expand Down
Loading