Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e525b90
refactor: split orchestrate to enhance maintainability
michael-johnston Nov 12, 2025
08f55ff
Merge remote-tracking branch 'origin/main' into maj_fix_operation_in_…
michael-johnston Nov 12, 2025
8842423
fix: typing
michael-johnston Nov 12, 2025
de000a0
refactor: provide functions via orchestrate
michael-johnston Nov 12, 2025
9e9c060
feat(core): Convert RayTaskError to original error
michael-johnston Nov 13, 2025
7951172
docs(core): Update exceptions raised
michael-johnston Nov 13, 2025
40f0542
feat(core): Improve granularity of exception handling
michael-johnston Nov 13, 2025
4cba376
Merge remote-tracking branch 'origin/main' into maj_fix_operation_in_…
michael-johnston Nov 13, 2025
0e53c76
Merge remote-tracking branch 'origin/main' into maj_fix_operation_in_…
michael-johnston Nov 13, 2025
c97df6d
Merge remote-tracking branch 'origin/main' into maj_fix_operation_in_…
michael-johnston Nov 13, 2025
a0a0bc6
chore(core): merge log change from main
michael-johnston Nov 13, 2025
dced8ce
refactor: update to new import
michael-johnston Nov 13, 2025
bb7a007
refactor: expose required members via orchestrate
michael-johnston Nov 13, 2025
f17530d
fix: Access via public interface
michael-johnston Nov 13, 2025
f14973a
Merge remote-tracking branch 'origin/main' into maj_fix_operation_in_…
michael-johnston Nov 17, 2025
f95dbac
refactor: orchestrate params
michael-johnston Nov 17, 2025
4576ca2
refactor: remove project_context arg
michael-johnston Nov 17, 2025
1bb9e67
refactor(orchestration): remove queue arg
michael-johnston Nov 17, 2025
177824f
refactor(orchestration): rename param
michael-johnston Nov 18, 2025
c7943e1
refactor(orchestration): remove queue arg
michael-johnston Nov 18, 2025
3ed858c
refactor(orchestration): Remove BaseOperationRunConfiguration
michael-johnston Nov 18, 2025
35e0721
chore(comments): delete comment
michael-johnston Nov 18, 2025
547d3a1
refactor(orchestrate): move acquiring ac_config
michael-johnston Nov 18, 2025
5c0d194
refactor(orchestrate): actuator configuration validation
michael-johnston Nov 18, 2025
1814f2e
Merge remote-tracking branch 'origin/main' into maj_fix_operation_in_…
michael-johnston Nov 19, 2025
15136f3
fix: imports
michael-johnston Nov 19, 2025
2fc4a24
fix: temporarily use private module
michael-johnston Nov 19, 2025
cb00a17
refactor(orchestration): add_operation_from_configuration_to_metastore
michael-johnston Nov 21, 2025
1e35ee2
refactor(orchestration): _run_operation_harness
michael-johnston Nov 21, 2025
b561455
refactor(orchestration): orchestrate_explore_operation
michael-johnston Nov 21, 2025
b12afe8
refactor(orchestration): setup operator
michael-johnston Nov 21, 2025
72ea4bd
fix(orchestration): Check operation_info namespace
michael-johnston Nov 21, 2025
ffb3b4f
feat(core): Add namespace field to FunctionOperation
michael-johnston Nov 21, 2025
08f13da
refactor(orchestrate): update to new interface
michael-johnston Nov 21, 2025
41c6265
fix(orchestrate): Pass namespace to general operators
michael-johnston Nov 21, 2025
1019893
refactor(orchestrate): Update orchestrate
michael-johnston Nov 21, 2025
d2f8ea9
Merge branch 'main' into maj_fix_operation_in_operation
michael-johnston Nov 21, 2025
4bffb1c
refactor(orchestrate): Remove unused parameter
michael-johnston Nov 21, 2025
f25e73c
docs(operator): update to new function
michael-johnston Nov 21, 2025
656505c
docs(orchestrate): update docstrings
michael-johnston Nov 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions orchestrator/cli/resources/operation/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ def create_operation(parameters: AdoCreateCommandParameters):

try:
operation_output = orchestrator.modules.operators.orchestrate.orchestrate(
base_operation_configuration=op_resource_configuration,
operation_resource_configuration=op_resource_configuration,
project_context=parameters.ado_configuration.project_context,
discovery_space_identifier=op_resource_configuration.spaces[0],
discovery_space_configuration=None,
)

except MeasurementError as e:
Expand Down
269 changes: 175 additions & 94 deletions orchestrator/core/operation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,141 @@ class DiscoveryOperationEnum(enum.Enum):
EXPORT = "export"


def get_actuator_configurations(
project_context: ProjectContext, actuator_configuration_identifiers: list[str]
) -> list[ActuatorConfiguration]:
"""Retrieves actuator configurations from the metastore

Fetches ActuatorConfiguration resources from the metastore using the provided
identifiers and validates that each actuator has at most one configuration.

Params:
project_context: Project context for connecting to the metastore
actuator_configuration_identifiers: List of identifiers for actuator
configuration resources to retrieve

Returns:
List of ActuatorConfiguration instances retrieved from the metastore

Raises:
ValueError: If more than one ActuatorConfiguration references the same actuator
"""
import orchestrator.metastore.sqlstore

sql = orchestrator.metastore.sqlstore.SQLStore(project_context=project_context)

actuator_configurations = [
sql.getResource(
identifier=identifier,
kind=CoreResourceKinds.ACTUATORCONFIGURATION,
raise_error_if_no_resource=True,
).config
Comment on lines +65 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can also raise a ResourceDoesNotExistError - I think I forgot to update the docstring for the interface method

for identifier in actuator_configuration_identifiers
]

actuator_identifiers = {conf.actuatorIdentifier for conf in actuator_configurations}
if len(actuator_identifiers) != len(actuator_configuration_identifiers):
raise ValueError("Only one ActuatorConfiguration is permitted per Actuator")

return actuator_configurations


def validate_actuator_configurations_against_space_configuration(
actuator_configurations: list[ActuatorConfiguration],
discovery_space_configuration: DiscoverySpaceConfiguration,
) -> list[ActuatorConfiguration]:
"""Validates that actuator configurations are compatible with a discovery space

Checks that all actuators referenced in the actuator configurations are used
in the experiments defined in the discovery space configuration.

Params:
actuator_configurations: List of actuator configurations to validate
discovery_space_configuration: The discovery space configuration to validate against

Returns:
The same list of actuator_configurations if validation passes
Comment on lines +93 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth to return the same exact input parameter? We could just have this return nothing and just raise an error if validation fails


Raises:
ValueError: If any actuator identifier in actuator_configurations does not
appear in the experiments of the discovery space
"""
actuator_identifiers = {conf.actuatorIdentifier for conf in actuator_configurations}

# Check the actuators configurations refer to actuators used in the MeasurementSpace
# The experiment identifiers are in two different locations
if isinstance(
discovery_space_configuration.experiments, MeasurementSpaceConfiguration
):
experiment_actuator_identifiers = {
experiment.actuatorIdentifier
for experiment in discovery_space_configuration.experiments.experiments
}
else:
experiment_actuator_identifiers = {
experiment.actuatorIdentifier
for experiment in discovery_space_configuration.experiments
}

if not experiment_actuator_identifiers.issuperset(actuator_identifiers):
raise ValueError(
f"Actuator Identifiers {actuator_identifiers} must appear in the experiments of its space"
)

return actuator_configurations


def validate_actuator_configuration_ids_against_space_ids(
actuator_configuration_identifiers: list[str],
space_identifiers: list[str],
project_context: ProjectContext,
):
"""Validates actuator configuration identifiers against space identifiers

Retrieves actuator configurations and space configurations from the metastore,
then validates that all actuator configurations are compatible with all specified
discovery spaces.

Params:
actuator_configuration_identifiers: List of actuator configuration resource
identifiers to validate
space_identifiers: List of discovery space resource identifiers to validate against
project_context: Project context for connecting to the metastore

Returns:
List of ActuatorConfiguration instances that were validated

Raises:
ValueError: If any actuator configuration is not compatible with any of the
discovery spaces, or if more than one ActuatorConfiguration references
the same actuator
"""
import orchestrator.metastore.sqlstore

sql = orchestrator.metastore.sqlstore.SQLStore(project_context=project_context)
space_configurations: list[DiscoverySpaceConfiguration] = [
sql.getResource(
identifier=identifier,
kind=CoreResourceKinds.DISCOVERYSPACE,
raise_error_if_no_resource=True,
).config
Comment on lines +154 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also raise ResourceDoesNotExistError

for identifier in space_identifiers
]

actuator_configurations = get_actuator_configurations(
project_context=project_context,
actuator_configuration_identifiers=actuator_configuration_identifiers,
)

for config in space_configurations:
validate_actuator_configurations_against_space_configuration(
actuator_configurations=actuator_configurations,
discovery_space_configuration=config,
)

return actuator_configurations


class OperatorModuleConf(ModuleConf):
moduleType: ModuleTypeEnum = pydantic.Field(default=ModuleTypeEnum.OPERATION)

Expand Down Expand Up @@ -128,10 +263,8 @@ class DiscoveryOperationConfiguration(pydantic.BaseModel):
)


class BaseOperationRunConfiguration(pydantic.BaseModel):
"""Field shared by OrchestratorRunConfiguration and OperationResourceConfiguration

both are models used to run an operation"""
class DiscoveryOperationResourceConfiguration(pydantic.BaseModel):
"""Pydantic model used to define an operation"""

operation: DiscoveryOperationConfiguration
metadata: ConfigurationMetadata = pydantic.Field(
Expand All @@ -140,13 +273,27 @@ class BaseOperationRunConfiguration(pydantic.BaseModel):
"Two optional keys that are used by convention are name and description",
)
actuatorConfigurationIdentifiers: list[str] = pydantic.Field(default=[])
spaces: list[str] = pydantic.Field(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with actuatorConfigurationIdentifiers, this should probably be spaceIdentifiers

description="List of ids of the spaces the operation will be applied to"
)
model_config = ConfigDict(
extra="forbid",
json_schema_extra={
"version": importlib.metadata.version(distribution_name="ado-core")
},
)

@pydantic.field_validator("spaces")
def check_space_set(cls, value):
"""Checks at least one space identifier has been given"""

if len(value) == 0:
raise ValueError(
"You must provide at least one space identifier to an operation"
)

return value

Comment on lines +286 to +296
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be replaced by adding min_length=1 to the Pydantic field

def get_actuatorconfigurations(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is kind of useless at the moment - if we push early exit condition for empty actuatorConfigurationIdentifiers to the get_actuator_configurations method we can remove this

self, project_context: ProjectContext
) -> list[ActuatorConfiguration]:
Expand All @@ -163,115 +310,49 @@ def get_actuatorconfigurations(
Raises: ValueError if there is more than one ActuatorConfigurationResource references the same actuator
"""

import orchestrator.metastore.sqlstore

if not self.actuatorConfigurationIdentifiers:
return []

sql = orchestrator.metastore.sqlstore.SQLStore(project_context=project_context)

actuator_configurations = [
sql.getResource(
identifier=identifier,
kind=CoreResourceKinds.ACTUATORCONFIGURATION,
raise_error_if_no_resource=True,
).config
for identifier in self.actuatorConfigurationIdentifiers
]

actuator_identifiers = {
conf.actuatorIdentifier for conf in actuator_configurations
}
if len(actuator_identifiers) != len(self.actuatorConfigurationIdentifiers):
raise ValueError("Only one ActuatorConfiguration is permitted per Actuator")

return actuator_configurations

def validate_actuatorconfigurations_against_space(
self,
project_context: ProjectContext,
discoverySpaceConfiguration: DiscoverySpaceConfiguration,
) -> list[ActuatorConfiguration]:

actuator_configurations = self.get_actuatorconfigurations(
project_context=project_context
return get_actuator_configurations(
project_context=project_context,
actuator_configuration_identifiers=self.actuatorConfigurationIdentifiers,
)
actuator_identifiers = {
conf.actuatorIdentifier for conf in actuator_configurations
}

# Check the actuators configurations refer to actuators used in the MeasurementSpace
# The experiment identifiers are in two different locations
if isinstance(
discoverySpaceConfiguration.experiments, MeasurementSpaceConfiguration
):
experiment_actuator_identifiers = {
experiment.actuatorIdentifier
for experiment in discoverySpaceConfiguration.experiments.experiments
}
else:
experiment_actuator_identifiers = {
experiment.actuatorIdentifier
for experiment in discoverySpaceConfiguration.experiments
}

if not experiment_actuator_identifiers.issuperset(actuator_identifiers):
raise ValueError(
f"Actuator Identifiers {actuator_identifiers} must appear in the experiments of its space"
)

return actuator_configurations


class DiscoveryOperationResourceConfiguration(BaseOperationRunConfiguration):

spaces: list[str] = pydantic.Field(
description="The spaces the operation will be applied to"
)

@pydantic.field_validator("spaces")
def check_space_set(cls, value):
"""Checks at least one space identifier has been given"""

if len(value) == 0:
raise ValueError(
"You must provide at least one space identifier to an operation"
)

return value

def validate_actuatorconfigurations(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems redundant now

self, project_context: ProjectContext
) -> list[ActuatorConfiguration]:
"""Gets and valdidates the actuator configuration resources referenced by actuatorConfigurationIdentifiers from the metastore if any

from orchestrator.core.discoveryspace.space import DiscoverySpace
This also requires getting the configuration of the discovery space

actuator_configurations: list[ActuatorConfiguration] = []
for space in self.spaces:
discovery_space = DiscoverySpace.from_stored_configuration(
project_context=project_context,
space_identifier=space,
)
Params:
project_context: Information for connection to the metastore

actuator_configurations.extend(
super().validate_actuatorconfigurations_against_space(
project_context=project_context,
discoverySpaceConfiguration=discovery_space.config,
)
)
Returns:
A list of ActuatorConfigurationResource instance. The list will be empty if
there are no actuatorConfigurationIdentifiers.

return actuator_configurations

Raises: ValueError if more than one ActuatorConfigurationResource references the same actuator
"""

return validate_actuator_configuration_ids_against_space_ids(
actuator_configuration_identifiers=self.actuatorConfigurationIdentifiers,
space_identifiers=self.spaces,
project_context=project_context,
)

class FunctionOperationInfo(pydantic.BaseModel):
"""Class for holding information for operations executed via operator functions

Operators implemented as functions may need additional information.
Rather that have these as multiple params we gather them in this model"""
class FunctionOperationInfo(pydantic.BaseModel):
"""Class for providing information to operator functions"""

metadata: ConfigurationMetadata = pydantic.Field(
default=ConfigurationMetadata(),
description="User defined metadata about the configuration. A set of keys and values. "
"Two optional keys that are used by convention are name and description",
)
actuatorConfigurationIdentifiers: list[str] = pydantic.Field(default=[])
namespace: str | None = pydantic.Field(
description="The namespace the operation should create ray workers/actors in",
default=None,
)
Comment on lines +355 to +358
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call this more explicitly ray_namespace

Loading