diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py index 59180fa5abfb..2f94e2d268e1 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py @@ -331,8 +331,8 @@ def _validate_init_finalize_job(self) -> MutableValidationResult: validation_result.append_error(yaml_path="jobs", message="No other job except for on_init/on_finalize job.") def _is_isolated_job(_validate_job_name: str) -> bool: - # no input to validate job _validate_job = self.jobs[_validate_job_name] + # no input to validate job for _input_name in _validate_job.inputs: if not hasattr(_validate_job.inputs[_input_name]._data, "_data_binding"): continue @@ -349,19 +349,29 @@ def _is_isolated_job(_validate_job_name: str) -> bool: return False return True + def _is_control_flow_node(_validate_job_name: str) -> bool: + from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode + + _validate_job = self.jobs[_validate_job_name] + return issubclass(type(_validate_job), ControlFlowNode) + # validate on_init if on_init is not None: if on_init not in self.jobs: append_on_init_error(f"On_init job name {on_init} not exists in jobs.") else: - if not _is_isolated_job(on_init): + if _is_control_flow_node(on_init): + append_on_init_error("On_init job should not be a control flow node.") + elif not _is_isolated_job(on_init): append_on_init_error("On_init job should not have connection to other execution node.") # validate on_finalize if on_finalize is not None: if on_finalize not in self.jobs: append_on_finalize_error(f"On_finalize job name {on_finalize} not exists in jobs.") else: - if not _is_isolated_job(on_finalize): + if _is_control_flow_node(on_finalize): + append_on_finalize_error("On_finalize job should not be a control flow node.") + elif not _is_isolated_job(on_finalize): append_on_finalize_error("On_finalize job should not have connection to other execution node.") return validation_result diff --git a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_init_finalize_job.py b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_init_finalize_job.py index 3a53c7754c55..3640aeea8bd7 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_init_finalize_job.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_init_finalize_job.py @@ -144,7 +144,7 @@ def subgraph_with_init_func(): subgraph_with_init_func() assert str(e.value) == "On_init/on_finalize is not supported for pipeline component." - def test_init_finalize_job_with_subgraph(self, caplog) -> None: + def test_init_finalize_job_with_subgraph(self) -> None: from azure.ai.ml._internal.dsl import set_pipeline_settings # happy path