From 884360400e7f9f6de8802d626f4dd3784d6e5176 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Wed, 19 Oct 2022 20:49:36 +0800 Subject: [PATCH 01/11] support non_pipeline_parameters --- .../ai/ml/dsl/_pipeline_component_builder.py | 14 +++--- .../azure/ai/ml/dsl/_pipeline_decorator.py | 24 ++++++--- .../ai/ml/entities/_inputs_outputs/utils.py | 7 +-- sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py | 17 +++++++ .../tests/dsl/e2etests/test_dsl_pipeline.py | 50 ++++++++++++++++++- 5 files changed, 96 insertions(+), 16 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py index ea9aafd2cffe..b0a3ee68a82e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -145,6 +145,7 @@ def __init__( default_datastore=None, tags=None, source_path=None, + non_pipeline_parameters=None ): self.func = func name = name if name else func.__name__ @@ -155,6 +156,7 @@ def __init__( name = func.__name__ # List of nodes, order by it's creation order in pipeline. self.nodes = [] + self.non_pipeline_parameter_names = non_pipeline_parameters or [] # A dict of inputs name to InputDefinition. # TODO: infer pipeline component input meta from assignment self.inputs = self._build_inputs(func) @@ -181,10 +183,10 @@ def add_node(self, node: Union[BaseNode, AutoMLJob]): """ self.nodes.append(node) - def build(self) -> PipelineComponent: + def build(self, non_pipeline_params_dict=None) -> PipelineComponent: # Clear nodes as we may call build multiple times. self.nodes = [] - kwargs = _build_pipeline_parameter(self.func, self._get_group_parameter_defaults()) + kwargs = _build_pipeline_parameter(self.func, self._get_group_parameter_defaults(), non_pipeline_params_dict) # We use this stack to store the dsl pipeline definition hierarchy _definition_builder_stack.push(self) @@ -218,7 +220,7 @@ def build(self) -> PipelineComponent: return pipeline_component def _build_inputs(self, func): - inputs = _get_param_with_standard_annotation(func, is_func=True) + inputs = _get_param_with_standard_annotation(func, is_func=True, skip_params=self.non_pipeline_parameter_names) for k, v in inputs.items(): # add arg description if k in self._args_description: @@ -379,7 +381,7 @@ def _get_name_or_component_name(node: Union[BaseNode, AutoMLJob]): return result -def _build_pipeline_parameter(func, kwargs=None): +def _build_pipeline_parameter(func, kwargs=None, non_pipeline_parameter_dict=None): # Pass group defaults into kwargs to support group.item can be used even if no default on function. # example: # @parameter_group @@ -391,9 +393,9 @@ def _build_pipeline_parameter(func, kwargs=None): # component_func(input=param.key) <--- param.key should be val. # transform kwargs - transformed_kwargs = {} + transformed_kwargs = non_pipeline_parameter_dict if kwargs: - transformed_kwargs.update({key: _wrap_pipeline_parameter(key, value) for key, value in kwargs.items()}) + transformed_kwargs.update({key: _wrap_pipeline_parameter(key, value) for key, value in kwargs.items() if key not in non_pipeline_parameter_dict}) def all_params(parameters): for value in parameters.values(): diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index d43292178706..605011797115 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -10,7 +10,7 @@ from functools import wraps from inspect import Parameter, signature from pathlib import Path -from typing import Any, Callable, Dict, TypeVar +from typing import Any, Callable, Dict, TypeVar, List from azure.ai.ml.entities import Data, PipelineJob, PipelineJobSettings from azure.ai.ml.entities._builders.pipeline import Pipeline @@ -24,6 +24,8 @@ UnexpectedKeywordError, UnsupportedParameterKindError, UserErrorException, + NonExistParamValueError, + UnExpectedNonPipelineParameterTypeError, ) from ._pipeline_component_builder import PipelineComponentBuilder, _is_inside_dsl_pipeline_func @@ -57,6 +59,7 @@ def pipeline( description: str = None, experiment_name: str = None, tags: Dict[str, str] = None, + non_pipeline_parameters: List[str] = None, **kwargs, ): """Build a pipeline which contains all component nodes defined in this @@ -104,6 +107,8 @@ def sample_pipeline_func(pipeline_input, pipeline_str_param): :type experiment_name: str :param tags: The tags of pipeline component. :type tags: dict[str, str] + :param non_pipeline_parameters: The names of non pipeline parameters in dsl function parameter list. + :type non_pipeline_parameters: list[str] :param kwargs: A dictionary of additional configuration parameters. :type kwargs: dict """ @@ -111,7 +116,8 @@ def sample_pipeline_func(pipeline_input, pipeline_str_param): def pipeline_decorator(func: _TFunc) -> _TFunc: if not isinstance(func, Callable): # pylint: disable=isinstance-second-argument-not-valid-type raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") - + if non_pipeline_parameters and not isinstance(non_pipeline_parameters, List) or any(not isinstance(param, str) for param in non_pipeline_parameters): + raise UnExpectedNonPipelineParameterTypeError() # compute variable names changed from default_compute_targe -> compute -> default_compute -> none # to support legacy usage, we support them with priority. compute = kwargs.get("compute", None) @@ -149,6 +155,7 @@ def pipeline_decorator(func: _TFunc) -> _TFunc: default_datastore=default_datastore, tags=tags, source_path=str(func_entry_path), + non_pipeline_parameters=non_pipeline_parameters, ) @wraps(func) @@ -159,12 +166,13 @@ def wrapper(*args, **kwargs) -> PipelineJob: # Because we only want to enable dsl settings on top level pipeline _dsl_settings_stack.push() # use this stack to track on_init/on_finalize settings try: - provided_positional_args = _validate_args(func, args, kwargs) + provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_parameters) # Convert args to kwargs kwargs.update(provided_positional_args) + non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_parameters} # TODO: cache built pipeline component - pipeline_component = pipeline_builder.build() + pipeline_component = pipeline_builder.build(non_pipeline_params_dict=non_pipeline_params_dict) finally: # use `finally` to ensure pop operation from the stack dsl_settings = _dsl_settings_stack.pop() @@ -215,7 +223,7 @@ def wrapper(*args, **kwargs) -> PipelineJob: return pipeline_decorator -def _validate_args(func, args, kwargs): +def _validate_args(func, args, kwargs, non_pipeline_parameters): """Validate customer function args and convert them to kwargs.""" # Positional arguments validate all_parameters = [param for _, param in signature(func).parameters.items()] @@ -224,6 +232,10 @@ def _validate_args(func, args, kwargs): raise UnsupportedParameterKindError(func.__name__) all_parameter_keys = [param.name for param in all_parameters] + unexpected_non_pipeline_parameters = [param for param in non_pipeline_parameters if param not in all_parameter_keys] + if unexpected_non_pipeline_parameters: + raise NonExistParamValueError(func.__name__, unexpected_non_pipeline_parameters) + empty_parameters = {param.name: param for param in all_parameters if param.default is Parameter.empty} min_num = len(empty_parameters) max_num = len(all_parameters) @@ -250,7 +262,7 @@ def _is_supported_data_type(_data): for pipeline_input_name in provided_args: data = provided_args[pipeline_input_name] - if data is not None and not _is_supported_data_type(data): + if data is not None and not _is_supported_data_type(data) and pipeline_input_name not in non_pipeline_parameters: msg = ( "Pipeline input expected an azure.ai.ml.Input or primitive types (str, bool, int or float), " "but got type {}." diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py index 6e1def679c85..6344374f9c92 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py @@ -59,7 +59,7 @@ def _get_annotation_cls_by_type(t: type, raise_error=False, optional=None): # pylint: disable=too-many-statements -def _get_param_with_standard_annotation(cls_or_func, is_func=False): +def _get_param_with_standard_annotation(cls_or_func, is_func=False, skip_params=None): """Standardize function parameters or class fields with dsl.types annotation.""" # TODO: we'd better remove this potential recursive import @@ -207,6 +207,7 @@ def _split(_fields): } ) + skip_params = skip_params or [] inherited_fields = _get_inherited_fields() # From annotations get field with type annotations = getattr(cls_or_func, "__annotations__", {}) @@ -215,10 +216,10 @@ def _split(_fields): # Update fields use class field with defaults from class dict or signature(func).paramters if not is_func: # Only consider public fields in class dict - defaults_dict = {key: val for key, val in cls_or_func.__dict__.items() if not key.startswith("_")} + defaults_dict = {key: val for key, val in cls_or_func.__dict__.items() if not key.startswith("_") and key not in skip_params} else: # Infer parameter type from value if is function - defaults_dict = {key: val.default for key, val in signature(cls_or_func).parameters.items()} + defaults_dict = {key: val.default for key, val in signature(cls_or_func).parameters.items() if key not in skip_params} fields = _update_fields_with_default(annotation_fields, defaults_dict) all_fields = _merge_and_reorder(inherited_fields, fields) return all_fields diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py b/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py index b01a3a3a537a..172639a61e71 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py @@ -571,6 +571,23 @@ def __init__(self, func_name, keyword): super().__init__(message=message, no_personal_data_message=message) +class NonExistParamValueError(KeywordError): + """Exception raised when items in non_pipeline_parameters not in keyword parameters in + dynamic functions.""" + + def __init__(self, func_name, keywords): + message = "%s() got unexpected params in non_pipeline_parameters %r." % (func_name, keywords) + super().__init__(message=message, no_personal_data_message=message) + + +class UnExpectedNonPipelineParameterTypeError(UserErrorException): + """Exception raised when non_pipeline_parameter type is not List[str].""" + + def __init__(self): + message = "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" + super().__init__(message=message, no_personal_data_message=message) + + class UnsupportedOperationError(UserErrorException): """Exception raised when specified operation is not supported.""" diff --git a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py index 6e0e609d38db..e169862c4114 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py @@ -27,7 +27,8 @@ from azure.ai.ml.entities import Component from azure.ai.ml.entities import Component as ComponentEntity from azure.ai.ml.entities import Data, PipelineJob -from azure.ai.ml.exceptions import ValidationException +from azure.ai.ml.entities._job.pipeline._io import PipelineInput +from azure.ai.ml.exceptions import ValidationException, NonExistParamValueError, UnExpectedNonPipelineParameterTypeError from azure.ai.ml.parallel import ParallelJob, RunFunction, parallel_run_function from azure.core.exceptions import HttpResponseError from azure.core.polling import LROPoller @@ -2410,3 +2411,50 @@ def pipeline_with_group(group: ParamClass): } assert actual_job["inputs"] == expected_job_inputs assert actual_job["jobs"]["microsoft_samples_command_component_basic_inputs"]["inputs"] == expected_node_inputs + + def test_pipeline_with_non_pipeline_parameters(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) + component_func2 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) + + @dsl.pipeline(non_pipeline_parameters=["other_params"]) + def pipeline_func(job_in_number, job_in_path, other_params): + component_func1(component_in_number=job_in_number, component_in_path=job_in_path) + component_func2(component_in_number=other_params, component_in_path=job_in_path) + + pipeline = pipeline_func(10, job_input, 15) + assert "other_params" not in pipeline.inputs + assert isinstance(pipeline.jobs[component_func1.name].inputs["component_in_number"]._data, PipelineInput) + assert pipeline.jobs[component_func2.name].inputs["component_in_number"]._data == 15 + + def test_pipeline_with_invalid_non_pipeline_parameters(self): + + @dsl.pipeline(non_pipeline_parameters=[123]) + def pipeline_func(): + pass + + with pytest.raises(UnExpectedNonPipelineParameterTypeError) as error_info: + pipeline_func() + assert "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" in str(error_info) + + @dsl.pipeline(non_pipeline_parameters=["non_exist_param1", "non_exist_param2"]) + def pipeline_func(): + pass + + with pytest.raises(NonExistParamValueError) as error_info: + pipeline_func() + assert "pipeline_func() got unexpected params in non_pipeline_parameters ['non_exist_param1', 'non_exist_param2']" in str(error_info) + + def test_component_func_as_non_pipeline_parameters(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) + component_func2 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) + + @dsl.pipeline(non_pipeline_parameters=["component_func"]) + def pipeline_func(job_in_number, job_in_path, component_func): + component_func1(component_in_number=job_in_number, component_in_path=job_in_path) + component_func(component_in_number=job_in_number, component_in_path=job_in_path) + + pipeline = pipeline_func(job_in_number=10, job_in_path=job_input, component_func=component_func2) + assert len(pipeline.jobs) == 2 + assert component_func2.name in pipeline.jobs From 50f6dd5051dfa4775e2caf60ca42046ce1e2684d Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Wed, 19 Oct 2022 22:18:12 +0800 Subject: [PATCH 02/11] fix test case --- sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index 605011797115..71cb629eb8c0 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -116,7 +116,7 @@ def sample_pipeline_func(pipeline_input, pipeline_str_param): def pipeline_decorator(func: _TFunc) -> _TFunc: if not isinstance(func, Callable): # pylint: disable=isinstance-second-argument-not-valid-type raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") - if non_pipeline_parameters and not isinstance(non_pipeline_parameters, List) or any(not isinstance(param, str) for param in non_pipeline_parameters): + if non_pipeline_parameters and not isinstance(non_pipeline_parameters, List) and any(not isinstance(param, str) for param in non_pipeline_parameters): raise UnExpectedNonPipelineParameterTypeError() # compute variable names changed from default_compute_targe -> compute -> default_compute -> none # to support legacy usage, we support them with priority. From f7f9d9494f7ae9333c8e3a21858cca51d4438622 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 10:45:29 +0800 Subject: [PATCH 03/11] support non_pipeline_parameters --- sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index 71cb629eb8c0..5e2f8c41d86a 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -166,10 +166,11 @@ def wrapper(*args, **kwargs) -> PipelineJob: # Because we only want to enable dsl settings on top level pipeline _dsl_settings_stack.push() # use this stack to track on_init/on_finalize settings try: - provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_parameters) + non_pipeline_params = non_pipeline_parameters or [] + provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_params) # Convert args to kwargs kwargs.update(provided_positional_args) - non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_parameters} + non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_params} # TODO: cache built pipeline component pipeline_component = pipeline_builder.build(non_pipeline_params_dict=non_pipeline_params_dict) @@ -232,6 +233,7 @@ def _validate_args(func, args, kwargs, non_pipeline_parameters): raise UnsupportedParameterKindError(func.__name__) all_parameter_keys = [param.name for param in all_parameters] + non_pipeline_parameters = non_pipeline_parameters or [] unexpected_non_pipeline_parameters = [param for param in non_pipeline_parameters if param not in all_parameter_keys] if unexpected_non_pipeline_parameters: raise NonExistParamValueError(func.__name__, unexpected_non_pipeline_parameters) From 65ab57c63e54381a104359b787e52c07c53de5d1 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 10:52:38 +0800 Subject: [PATCH 04/11] fix comment --- .../azure/ai/ml/dsl/_pipeline_decorator.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index 5e2f8c41d86a..caaa2cbb6a2b 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -59,7 +59,6 @@ def pipeline( description: str = None, experiment_name: str = None, tags: Dict[str, str] = None, - non_pipeline_parameters: List[str] = None, **kwargs, ): """Build a pipeline which contains all component nodes defined in this @@ -107,8 +106,6 @@ def sample_pipeline_func(pipeline_input, pipeline_str_param): :type experiment_name: str :param tags: The tags of pipeline component. :type tags: dict[str, str] - :param non_pipeline_parameters: The names of non pipeline parameters in dsl function parameter list. - :type non_pipeline_parameters: list[str] :param kwargs: A dictionary of additional configuration parameters. :type kwargs: dict """ @@ -116,7 +113,9 @@ def sample_pipeline_func(pipeline_input, pipeline_str_param): def pipeline_decorator(func: _TFunc) -> _TFunc: if not isinstance(func, Callable): # pylint: disable=isinstance-second-argument-not-valid-type raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") - if non_pipeline_parameters and not isinstance(non_pipeline_parameters, List) and any(not isinstance(param, str) for param in non_pipeline_parameters): + + non_pipeline_parameters = kwargs.get("non_pipeline_parameters", []) + if not isinstance(non_pipeline_parameters, List) and any(not isinstance(param, str) for param in non_pipeline_parameters): raise UnExpectedNonPipelineParameterTypeError() # compute variable names changed from default_compute_targe -> compute -> default_compute -> none # to support legacy usage, we support them with priority. @@ -166,11 +165,10 @@ def wrapper(*args, **kwargs) -> PipelineJob: # Because we only want to enable dsl settings on top level pipeline _dsl_settings_stack.push() # use this stack to track on_init/on_finalize settings try: - non_pipeline_params = non_pipeline_parameters or [] - provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_params) + provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_parameters) # Convert args to kwargs kwargs.update(provided_positional_args) - non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_params} + non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_parameters} # TODO: cache built pipeline component pipeline_component = pipeline_builder.build(non_pipeline_params_dict=non_pipeline_params_dict) From f07ad0a792da6578f933c0856ebe664f4e3f4693 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 12:04:44 +0800 Subject: [PATCH 05/11] fix code style --- .../azure/ai/ml/dsl/_pipeline_component_builder.py | 7 ++++++- .../azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py | 6 ++++-- .../azure/ai/ml/entities/_inputs_outputs/utils.py | 10 ++++++++-- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py index b0a3ee68a82e..49bec4fbedea 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -395,7 +395,12 @@ def _build_pipeline_parameter(func, kwargs=None, non_pipeline_parameter_dict=Non # transform kwargs transformed_kwargs = non_pipeline_parameter_dict if kwargs: - transformed_kwargs.update({key: _wrap_pipeline_parameter(key, value) for key, value in kwargs.items() if key not in non_pipeline_parameter_dict}) + transformed_kwargs.update( + { + key: _wrap_pipeline_parameter(key, value) for key, value in kwargs.items() + if key not in non_pipeline_parameter_dict + } + ) def all_params(parameters): for value in parameters.values(): diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index caaa2cbb6a2b..3458ea049e00 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -115,7 +115,8 @@ def pipeline_decorator(func: _TFunc) -> _TFunc: raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") non_pipeline_parameters = kwargs.get("non_pipeline_parameters", []) - if not isinstance(non_pipeline_parameters, List) and any(not isinstance(param, str) for param in non_pipeline_parameters): + if not isinstance(non_pipeline_parameters, List) and \ + any(not isinstance(param, str) for param in non_pipeline_parameters): raise UnExpectedNonPipelineParameterTypeError() # compute variable names changed from default_compute_targe -> compute -> default_compute -> none # to support legacy usage, we support them with priority. @@ -262,7 +263,8 @@ def _is_supported_data_type(_data): for pipeline_input_name in provided_args: data = provided_args[pipeline_input_name] - if data is not None and not _is_supported_data_type(data) and pipeline_input_name not in non_pipeline_parameters: + if data is not None and not _is_supported_data_type(data) and \ + pipeline_input_name not in non_pipeline_parameters: msg = ( "Pipeline input expected an azure.ai.ml.Input or primitive types (str, bool, int or float), " "but got type {}." diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py index 6344374f9c92..1d51a818f513 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_inputs_outputs/utils.py @@ -216,10 +216,16 @@ def _split(_fields): # Update fields use class field with defaults from class dict or signature(func).paramters if not is_func: # Only consider public fields in class dict - defaults_dict = {key: val for key, val in cls_or_func.__dict__.items() if not key.startswith("_") and key not in skip_params} + defaults_dict = { + key: val for key, val in cls_or_func.__dict__.items() + if not key.startswith("_") and key not in skip_params + } else: # Infer parameter type from value if is function - defaults_dict = {key: val.default for key, val in signature(cls_or_func).parameters.items() if key not in skip_params} + defaults_dict = { + key: val.default for key, val in signature(cls_or_func).parameters.items() + if key not in skip_params + } fields = _update_fields_with_default(annotation_fields, defaults_dict) all_fields = _merge_and_reorder(inherited_fields, fields) return all_fields From dfdd6b7873027b994d550deb4c69df611e832d8b Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 13:22:29 +0800 Subject: [PATCH 06/11] fix test case --- .../tests/dsl/e2etests/test_dsl_pipeline.py | 50 +----------------- .../tests/dsl/unittests/test_dsl_pipeline.py | 52 ++++++++++++++++++- 2 files changed, 52 insertions(+), 50 deletions(-) diff --git a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py index b1ecb567ab0e..e2e8c0107c30 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py @@ -27,8 +27,7 @@ from azure.ai.ml.entities import Component from azure.ai.ml.entities import Component as ComponentEntity from azure.ai.ml.entities import Data, PipelineJob -from azure.ai.ml.entities._job.pipeline._io import PipelineInput -from azure.ai.ml.exceptions import ValidationException, NonExistParamValueError, UnExpectedNonPipelineParameterTypeError +from azure.ai.ml.exceptions import ValidationException from azure.ai.ml.parallel import ParallelJob, RunFunction, parallel_run_function from azure.core.exceptions import HttpResponseError from azure.core.polling import LROPoller @@ -2412,53 +2411,6 @@ def pipeline_with_group(group: ParamClass): assert actual_job["inputs"] == expected_job_inputs assert actual_job["jobs"]["microsoft_samples_command_component_basic_inputs"]["inputs"] == expected_node_inputs - def test_pipeline_with_non_pipeline_parameters(self): - component_yaml = components_dir / "helloworld_component.yml" - component_func1 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) - component_func2 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) - - @dsl.pipeline(non_pipeline_parameters=["other_params"]) - def pipeline_func(job_in_number, job_in_path, other_params): - component_func1(component_in_number=job_in_number, component_in_path=job_in_path) - component_func2(component_in_number=other_params, component_in_path=job_in_path) - - pipeline = pipeline_func(10, job_input, 15) - assert "other_params" not in pipeline.inputs - assert isinstance(pipeline.jobs[component_func1.name].inputs["component_in_number"]._data, PipelineInput) - assert pipeline.jobs[component_func2.name].inputs["component_in_number"]._data == 15 - - def test_pipeline_with_invalid_non_pipeline_parameters(self): - - @dsl.pipeline(non_pipeline_parameters=[123]) - def pipeline_func(): - pass - - with pytest.raises(UnExpectedNonPipelineParameterTypeError) as error_info: - pipeline_func() - assert "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" in str(error_info) - - @dsl.pipeline(non_pipeline_parameters=["non_exist_param1", "non_exist_param2"]) - def pipeline_func(): - pass - - with pytest.raises(NonExistParamValueError) as error_info: - pipeline_func() - assert "pipeline_func() got unexpected params in non_pipeline_parameters ['non_exist_param1', 'non_exist_param2']" in str(error_info) - - def test_component_func_as_non_pipeline_parameters(self): - component_yaml = components_dir / "helloworld_component.yml" - component_func1 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) - component_func2 = load_component(source=component_yaml, params_override=[{"name": randstr("component_name")}]) - - @dsl.pipeline(non_pipeline_parameters=["component_func"]) - def pipeline_func(job_in_number, job_in_path, component_func): - component_func1(component_in_number=job_in_number, component_in_path=job_in_path) - component_func(component_in_number=job_in_number, component_in_path=job_in_path) - - pipeline = pipeline_func(job_in_number=10, job_in_path=job_input, component_func=component_func2) - assert len(pipeline.jobs) == 2 - assert component_func2.name in pipeline.jobs - def test_dsl_pipeline_with_default_component( self, client: MLClient, diff --git a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py index ccd54924d550..e136aebf8364 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py @@ -28,7 +28,7 @@ from azure.ai.ml.entities._builders import Command from azure.ai.ml.entities._job.pipeline._io import PipelineInput from azure.ai.ml.entities._job.pipeline._load_component import _generate_component_function -from azure.ai.ml.exceptions import UserErrorException, ValidationException +from azure.ai.ml.exceptions import UserErrorException, ValidationException, NonExistParamValueError, UnExpectedNonPipelineParameterTypeError from .._util import _DSL_TIMEOUT_SECOND @@ -1940,3 +1940,53 @@ def pipeline_func(component_in_path): pipeline_job = pipeline_func(component_in_path=Data(name="test", version="1", type=AssetTypes.MLTABLE)) result = pipeline_job._validate() assert result._to_dict() == {"result": "Succeeded"} + + def test_pipeline_with_non_pipeline_parameters(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml, params_override=[{"name": "component_name_1"}]) + component_func2 = load_component(source=component_yaml, params_override=[{"name": "component_name_2"}]) + + @dsl.pipeline(non_pipeline_parameters=["other_params"]) + def pipeline_func(job_in_number, job_in_path, other_params): + component_func1(component_in_number=job_in_number, component_in_path=job_in_path) + component_func2(component_in_number=other_params, component_in_path=job_in_path) + + pipeline = pipeline_func(10, Input(path="/a/path/on/ds"), 15) + assert "other_params" not in pipeline.inputs + assert isinstance(pipeline.jobs[component_func1.name].inputs["component_in_number"]._data, PipelineInput) + assert pipeline.jobs[component_func2.name].inputs["component_in_number"]._data == 15 + + def test_pipeline_with_invalid_non_pipeline_parameters(self): + + @dsl.pipeline(non_pipeline_parameters=[123]) + def pipeline_func(): + pass + + with pytest.raises(UnExpectedNonPipelineParameterTypeError) as error_info: + pipeline_func() + assert "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" in str(error_info) + + @dsl.pipeline(non_pipeline_parameters=["non_exist_param1", "non_exist_param2"]) + def pipeline_func(): + pass + + with pytest.raises(NonExistParamValueError) as error_info: + pipeline_func() + assert "pipeline_func() got unexpected params in non_pipeline_parameters ['non_exist_param1', 'non_exist_param2']" in str(error_info) + + def test_component_func_as_non_pipeline_parameters(self): + component_yaml = components_dir / "helloworld_component.yml" + component_func1 = load_component(source=component_yaml, params_override=[{"name": "component_name_1"}]) + component_func2 = load_component(source=component_yaml, params_override=[{"name": "component_name_2"}]) + + @dsl.pipeline(non_pipeline_parameters=["component_func"]) + def pipeline_func(job_in_number, job_in_path, component_func): + component_func1(component_in_number=job_in_number, component_in_path=job_in_path) + component_func(component_in_number=job_in_number, component_in_path=job_in_path) + + pipeline = pipeline_func( + job_in_number=10, + job_in_path=Input(path="/a/path/on/ds"), + component_func=component_func2) + assert len(pipeline.jobs) == 2 + assert component_func2.name in pipeline.jobs From ba38493b5bc99c70f351dbceb27b318f9d660b30 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 13:24:12 +0800 Subject: [PATCH 07/11] fix test case --- .../azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py index 49bec4fbedea..c882b171d89e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -393,7 +393,7 @@ def _build_pipeline_parameter(func, kwargs=None, non_pipeline_parameter_dict=Non # component_func(input=param.key) <--- param.key should be val. # transform kwargs - transformed_kwargs = non_pipeline_parameter_dict + transformed_kwargs = non_pipeline_parameter_dict or {} if kwargs: transformed_kwargs.update( { From a38bd920d3cb6777400281f9652724d2750a5555 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 14:39:27 +0800 Subject: [PATCH 08/11] fix test case --- sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index 3458ea049e00..2057936fa9e7 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -115,7 +115,7 @@ def pipeline_decorator(func: _TFunc) -> _TFunc: raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") non_pipeline_parameters = kwargs.get("non_pipeline_parameters", []) - if not isinstance(non_pipeline_parameters, List) and \ + if not isinstance(non_pipeline_parameters, List) or \ any(not isinstance(param, str) for param in non_pipeline_parameters): raise UnExpectedNonPipelineParameterTypeError() # compute variable names changed from default_compute_targe -> compute -> default_compute -> none From ffac561b493ebcb228be9f93338ca41e89affa92 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 15:14:32 +0800 Subject: [PATCH 09/11] fix comment --- .../ai/ml/operations/_component_operations.py | 12 ++++++++++++ .../tests/dsl/e2etests/test_dsl_pipeline.py | 15 +++++++++++++++ .../tests/dsl/unittests/test_dsl_pipeline.py | 12 +++++++++--- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py index d527365f3072..da4796e74e16 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py @@ -577,9 +577,21 @@ def check_parameter_type(f): error_category=ErrorCategory.USER_ERROR, ) + def check_non_pipeline_parameters(f): + """Check whether non_pipeline_parameters exist in pipeline builder.""" + if f._pipeline_builder.non_pipeline_parameter_names: + msg = "Cannot register pipeline component {!r} with non_pipeline_parameters." + raise ValidationException( + message=msg.format(f.__name__), + no_personal_data_message=msg.format(""), + target=ErrorTarget.COMPONENT, + error_category=ErrorCategory.USER_ERROR, + ) + if hasattr(component_func, "_is_mldesigner_component") and component_func._is_mldesigner_component: return component_func.component if hasattr(component_func, "_is_dsl_func") and component_func._is_dsl_func: + check_non_pipeline_parameters(component_func) check_parameter_type(component_func) if component_func._job_settings: module_logger.warning( diff --git a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py index e2e8c0107c30..c89e593889b9 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py @@ -1061,6 +1061,21 @@ def pipeline_missing_type( in e.value.message ) + @dsl.pipeline(non_pipeline_parameters=['param']) + def pipeline_with_non_pipeline_parameters( + required_input: Input, + required_param: str, + param: str, + ): + default_optional_func( + required_input=required_input, + required_param=required_param, + ) + + with pytest.raises(ValidationException) as e: + client.components.create_or_update(pipeline_with_non_pipeline_parameters) + assert "Cannot register pipeline component 'pipeline_func' with non_pipeline_parameters." in e.value.message + def test_create_pipeline_component_by_dsl(self, caplog, client: MLClient): default_optional_func = load_component(source=str(components_dir / "default_optional_component.yml")) diff --git a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py index e136aebf8364..c7cdfedd308e 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py @@ -1946,16 +1946,22 @@ def test_pipeline_with_non_pipeline_parameters(self): component_func1 = load_component(source=component_yaml, params_override=[{"name": "component_name_1"}]) component_func2 = load_component(source=component_yaml, params_override=[{"name": "component_name_2"}]) - @dsl.pipeline(non_pipeline_parameters=["other_params"]) - def pipeline_func(job_in_number, job_in_path, other_params): + @dsl.pipeline(non_pipeline_parameters=["other_params", "is_add_component"]) + def pipeline_func(job_in_number, job_in_path, other_params, is_add_component): component_func1(component_in_number=job_in_number, component_in_path=job_in_path) component_func2(component_in_number=other_params, component_in_path=job_in_path) + if is_add_component: + component_func2(component_in_number=other_params, component_in_path=job_in_path) - pipeline = pipeline_func(10, Input(path="/a/path/on/ds"), 15) + pipeline = pipeline_func(10, Input(path="/a/path/on/ds"), 15, False) + assert len(pipeline.jobs) == 2 assert "other_params" not in pipeline.inputs assert isinstance(pipeline.jobs[component_func1.name].inputs["component_in_number"]._data, PipelineInput) assert pipeline.jobs[component_func2.name].inputs["component_in_number"]._data == 15 + pipeline = pipeline_func(10, Input(path="/a/path/on/ds"), 15, True) + assert len(pipeline.jobs) == 3 + def test_pipeline_with_invalid_non_pipeline_parameters(self): @dsl.pipeline(non_pipeline_parameters=[123]) From fa2ca2366978b4334f7d81d335c1533e4b2378d0 Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Thu, 20 Oct 2022 16:19:23 +0800 Subject: [PATCH 10/11] fix test case --- sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py | 6 +++--- sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index 2057936fa9e7..d9cf0b94f74e 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -115,9 +115,6 @@ def pipeline_decorator(func: _TFunc) -> _TFunc: raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") non_pipeline_parameters = kwargs.get("non_pipeline_parameters", []) - if not isinstance(non_pipeline_parameters, List) or \ - any(not isinstance(param, str) for param in non_pipeline_parameters): - raise UnExpectedNonPipelineParameterTypeError() # compute variable names changed from default_compute_targe -> compute -> default_compute -> none # to support legacy usage, we support them with priority. compute = kwargs.get("compute", None) @@ -225,6 +222,9 @@ def wrapper(*args, **kwargs) -> PipelineJob: def _validate_args(func, args, kwargs, non_pipeline_parameters): """Validate customer function args and convert them to kwargs.""" + if not isinstance(non_pipeline_parameters, List) or \ + any(not isinstance(param, str) for param in non_pipeline_parameters): + raise UnExpectedNonPipelineParameterTypeError() # Positional arguments validate all_parameters = [param for _, param in signature(func).parameters.items()] # Implicit parameter are *args and **kwargs diff --git a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py index c89e593889b9..6c491d430c71 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py @@ -1074,7 +1074,7 @@ def pipeline_with_non_pipeline_parameters( with pytest.raises(ValidationException) as e: client.components.create_or_update(pipeline_with_non_pipeline_parameters) - assert "Cannot register pipeline component 'pipeline_func' with non_pipeline_parameters." in e.value.message + assert "Cannot register pipeline component 'pipeline_with_non_pipeline_parameters' with non_pipeline_parameters." in e.value.message def test_create_pipeline_component_by_dsl(self, caplog, client: MLClient): default_optional_func = load_component(source=str(components_dir / "default_optional_component.yml")) From 4ce93c098c9c1a649636a000a66625db8ad6de2e Mon Sep 17 00:00:00 2001 From: lalala123123 <17938940+lalala123123@users.noreply.github.com> Date: Sun, 23 Oct 2022 20:36:11 +0800 Subject: [PATCH 11/11] fix comment --- .../ai/ml/dsl/_pipeline_component_builder.py | 4 +-- .../azure/ai/ml/dsl/_pipeline_decorator.py | 24 ++++++++--------- sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py | 4 +-- .../ai/ml/operations/_component_operations.py | 8 +++--- .../tests/dsl/e2etests/test_dsl_pipeline.py | 8 +++--- .../tests/dsl/unittests/test_dsl_pipeline.py | 26 +++++++++++++------ 6 files changed, 42 insertions(+), 32 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py index c882b171d89e..256dd63cd301 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_component_builder.py @@ -145,7 +145,7 @@ def __init__( default_datastore=None, tags=None, source_path=None, - non_pipeline_parameters=None + non_pipeline_inputs=None ): self.func = func name = name if name else func.__name__ @@ -156,7 +156,7 @@ def __init__( name = func.__name__ # List of nodes, order by it's creation order in pipeline. self.nodes = [] - self.non_pipeline_parameter_names = non_pipeline_parameters or [] + self.non_pipeline_parameter_names = non_pipeline_inputs or [] # A dict of inputs name to InputDefinition. # TODO: infer pipeline component input meta from assignment self.inputs = self._build_inputs(func) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py index d9cf0b94f74e..6babfa111166 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py @@ -114,7 +114,7 @@ def pipeline_decorator(func: _TFunc) -> _TFunc: if not isinstance(func, Callable): # pylint: disable=isinstance-second-argument-not-valid-type raise UserErrorException(f"Dsl pipeline decorator accept only function type, got {type(func)}.") - non_pipeline_parameters = kwargs.get("non_pipeline_parameters", []) + non_pipeline_inputs = kwargs.get("non_pipeline_inputs", []) or kwargs.get("non_pipeline_parameters", []) # compute variable names changed from default_compute_targe -> compute -> default_compute -> none # to support legacy usage, we support them with priority. compute = kwargs.get("compute", None) @@ -152,7 +152,7 @@ def pipeline_decorator(func: _TFunc) -> _TFunc: default_datastore=default_datastore, tags=tags, source_path=str(func_entry_path), - non_pipeline_parameters=non_pipeline_parameters, + non_pipeline_inputs=non_pipeline_inputs, ) @wraps(func) @@ -163,10 +163,10 @@ def wrapper(*args, **kwargs) -> PipelineJob: # Because we only want to enable dsl settings on top level pipeline _dsl_settings_stack.push() # use this stack to track on_init/on_finalize settings try: - provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_parameters) + provided_positional_args = _validate_args(func, args, kwargs, non_pipeline_inputs) # Convert args to kwargs kwargs.update(provided_positional_args) - non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_parameters} + non_pipeline_params_dict = {k: v for k, v in kwargs.items() if k in non_pipeline_inputs} # TODO: cache built pipeline component pipeline_component = pipeline_builder.build(non_pipeline_params_dict=non_pipeline_params_dict) @@ -220,10 +220,10 @@ def wrapper(*args, **kwargs) -> PipelineJob: return pipeline_decorator -def _validate_args(func, args, kwargs, non_pipeline_parameters): +def _validate_args(func, args, kwargs, non_pipeline_inputs): """Validate customer function args and convert them to kwargs.""" - if not isinstance(non_pipeline_parameters, List) or \ - any(not isinstance(param, str) for param in non_pipeline_parameters): + if not isinstance(non_pipeline_inputs, List) or \ + any(not isinstance(param, str) for param in non_pipeline_inputs): raise UnExpectedNonPipelineParameterTypeError() # Positional arguments validate all_parameters = [param for _, param in signature(func).parameters.items()] @@ -232,10 +232,10 @@ def _validate_args(func, args, kwargs, non_pipeline_parameters): raise UnsupportedParameterKindError(func.__name__) all_parameter_keys = [param.name for param in all_parameters] - non_pipeline_parameters = non_pipeline_parameters or [] - unexpected_non_pipeline_parameters = [param for param in non_pipeline_parameters if param not in all_parameter_keys] - if unexpected_non_pipeline_parameters: - raise NonExistParamValueError(func.__name__, unexpected_non_pipeline_parameters) + non_pipeline_inputs = non_pipeline_inputs or [] + unexpected_non_pipeline_inputs = [param for param in non_pipeline_inputs if param not in all_parameter_keys] + if unexpected_non_pipeline_inputs: + raise NonExistParamValueError(func.__name__, unexpected_non_pipeline_inputs) empty_parameters = {param.name: param for param in all_parameters if param.default is Parameter.empty} min_num = len(empty_parameters) @@ -264,7 +264,7 @@ def _is_supported_data_type(_data): for pipeline_input_name in provided_args: data = provided_args[pipeline_input_name] if data is not None and not _is_supported_data_type(data) and \ - pipeline_input_name not in non_pipeline_parameters: + pipeline_input_name not in non_pipeline_inputs: msg = ( "Pipeline input expected an azure.ai.ml.Input or primitive types (str, bool, int or float), " "but got type {}." diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py b/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py index 172639a61e71..5631168bc98b 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/exceptions.py @@ -572,11 +572,11 @@ def __init__(self, func_name, keyword): class NonExistParamValueError(KeywordError): - """Exception raised when items in non_pipeline_parameters not in keyword parameters in + """Exception raised when items in non_pipeline_inputs not in keyword parameters in dynamic functions.""" def __init__(self, func_name, keywords): - message = "%s() got unexpected params in non_pipeline_parameters %r." % (func_name, keywords) + message = "%s() got unexpected params in non_pipeline_inputs %r." % (func_name, keywords) super().__init__(message=message, no_personal_data_message=message) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py index da4796e74e16..2d8fdff3a43c 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py @@ -577,10 +577,10 @@ def check_parameter_type(f): error_category=ErrorCategory.USER_ERROR, ) - def check_non_pipeline_parameters(f): - """Check whether non_pipeline_parameters exist in pipeline builder.""" + def check_non_pipeline_inputs(f): + """Check whether non_pipeline_inputs exist in pipeline builder.""" if f._pipeline_builder.non_pipeline_parameter_names: - msg = "Cannot register pipeline component {!r} with non_pipeline_parameters." + msg = "Cannot register pipeline component {!r} with non_pipeline_inputs." raise ValidationException( message=msg.format(f.__name__), no_personal_data_message=msg.format(""), @@ -591,7 +591,7 @@ def check_non_pipeline_parameters(f): if hasattr(component_func, "_is_mldesigner_component") and component_func._is_mldesigner_component: return component_func.component if hasattr(component_func, "_is_dsl_func") and component_func._is_dsl_func: - check_non_pipeline_parameters(component_func) + check_non_pipeline_inputs(component_func) check_parameter_type(component_func) if component_func._job_settings: module_logger.warning( diff --git a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py index 6c491d430c71..24afdeaa0df4 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py @@ -1061,8 +1061,8 @@ def pipeline_missing_type( in e.value.message ) - @dsl.pipeline(non_pipeline_parameters=['param']) - def pipeline_with_non_pipeline_parameters( + @dsl.pipeline(non_pipeline_inputs=['param']) + def pipeline_with_non_pipeline_inputs( required_input: Input, required_param: str, param: str, @@ -1073,8 +1073,8 @@ def pipeline_with_non_pipeline_parameters( ) with pytest.raises(ValidationException) as e: - client.components.create_or_update(pipeline_with_non_pipeline_parameters) - assert "Cannot register pipeline component 'pipeline_with_non_pipeline_parameters' with non_pipeline_parameters." in e.value.message + client.components.create_or_update(pipeline_with_non_pipeline_inputs) + assert "Cannot register pipeline component 'pipeline_with_non_pipeline_inputs' with non_pipeline_inputs." in e.value.message def test_create_pipeline_component_by_dsl(self, caplog, client: MLClient): default_optional_func = load_component(source=str(components_dir / "default_optional_component.yml")) diff --git a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py index c7cdfedd308e..f3e1c8d149e7 100644 --- a/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py +++ b/sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py @@ -1941,12 +1941,12 @@ def pipeline_func(component_in_path): result = pipeline_job._validate() assert result._to_dict() == {"result": "Succeeded"} - def test_pipeline_with_non_pipeline_parameters(self): + def test_pipeline_with_non_pipeline_inputs(self): component_yaml = components_dir / "helloworld_component.yml" component_func1 = load_component(source=component_yaml, params_override=[{"name": "component_name_1"}]) component_func2 = load_component(source=component_yaml, params_override=[{"name": "component_name_2"}]) - @dsl.pipeline(non_pipeline_parameters=["other_params", "is_add_component"]) + @dsl.pipeline(non_pipeline_inputs=["other_params", "is_add_component"]) def pipeline_func(job_in_number, job_in_path, other_params, is_add_component): component_func1(component_in_number=job_in_number, component_in_path=job_in_path) component_func2(component_in_number=other_params, component_in_path=job_in_path) @@ -1962,9 +1962,19 @@ def pipeline_func(job_in_number, job_in_path, other_params, is_add_component): pipeline = pipeline_func(10, Input(path="/a/path/on/ds"), 15, True) assert len(pipeline.jobs) == 3 - def test_pipeline_with_invalid_non_pipeline_parameters(self): + @dsl.pipeline(non_pipeline_parameters=["other_params", "is_add_component"]) + def pipeline_func(job_in_number, job_in_path, other_params, is_add_component): + component_func1(component_in_number=job_in_number, component_in_path=job_in_path) + component_func2(component_in_number=other_params, component_in_path=job_in_path) + if is_add_component: + component_func2(component_in_number=other_params, component_in_path=job_in_path) + + pipeline = pipeline_func(10, Input(path="/a/path/on/ds"), 15, True) + assert len(pipeline.jobs) == 3 + + def test_pipeline_with_invalid_non_pipeline_inputs(self): - @dsl.pipeline(non_pipeline_parameters=[123]) + @dsl.pipeline(non_pipeline_inputs=[123]) def pipeline_func(): pass @@ -1972,20 +1982,20 @@ def pipeline_func(): pipeline_func() assert "Type of 'non_pipeline_parameter' in dsl.pipeline should be a list of string" in str(error_info) - @dsl.pipeline(non_pipeline_parameters=["non_exist_param1", "non_exist_param2"]) + @dsl.pipeline(non_pipeline_inputs=["non_exist_param1", "non_exist_param2"]) def pipeline_func(): pass with pytest.raises(NonExistParamValueError) as error_info: pipeline_func() - assert "pipeline_func() got unexpected params in non_pipeline_parameters ['non_exist_param1', 'non_exist_param2']" in str(error_info) + assert "pipeline_func() got unexpected params in non_pipeline_inputs ['non_exist_param1', 'non_exist_param2']" in str(error_info) - def test_component_func_as_non_pipeline_parameters(self): + def test_component_func_as_non_pipeline_inputs(self): component_yaml = components_dir / "helloworld_component.yml" component_func1 = load_component(source=component_yaml, params_override=[{"name": "component_name_1"}]) component_func2 = load_component(source=component_yaml, params_override=[{"name": "component_name_2"}]) - @dsl.pipeline(non_pipeline_parameters=["component_func"]) + @dsl.pipeline(non_pipeline_inputs=["component_func"]) def pipeline_func(job_in_number, job_in_path, component_func): component_func1(component_in_number=job_in_number, component_in_path=job_in_path) component_func(component_in_number=job_in_number, component_in_path=job_in_path)