From 42819fc8e50e2126887ed9136f828a6ab3e09c3f Mon Sep 17 00:00:00 2001 From: Abhishek Munagekar <10258799+munagekar@users.noreply.github.com> Date: Tue, 24 May 2022 01:42:06 +0900 Subject: [PATCH] feat: support autoscaling metrics when deploying models (#1197) * feat: support autoscaling metrics when deploying models * feat: support model deploy to endpoint with autoscaling metrics * fix autoscaling_target_accelerator_duty_cycle check * fix docstring: specify that autoscaling_params are optional * bug fix: add autoscaling_target_cpu_utilization to custom_resource_spec * add tests * add _TEST_METRIC_NAME_CPU_UTILIZATION and _TEST_METRIC_NAME_GPU_UTILIZATION * remove not required arguments in tests * fix tests: wait for LRO to complete even if not sync * fix lint: run black Co-authored-by: Sara Robinson --- google/cloud/aiplatform/models.py | 104 ++++++++++++++++-- tests/unit/aiplatform/test_endpoints.py | 139 ++++++++++++++++++++++++ 2 files changed, 234 insertions(+), 9 deletions(-) diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 95f3044cbeb..eb9b1a00426 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -644,6 +644,8 @@ def deploy( metadata: Optional[Sequence[Tuple[str, str]]] = (), sync=True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -717,6 +719,13 @@ def deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. """ self._sync_gca_resource_if_skipped() @@ -747,6 +756,8 @@ def deploy( metadata=metadata, sync=sync, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) @base.optional_sync() @@ -767,6 +778,8 @@ def _deploy( metadata: Optional[Sequence[Tuple[str, str]]] = (), sync=True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -840,6 +853,13 @@ def _deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Raises: ValueError: If there is not current traffic split and traffic percentage is not 0 or 100. @@ -866,6 +886,8 @@ def _deploy( explanation_parameters=explanation_parameters, metadata=metadata, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) _LOGGER.log_action_completed_against_resource("model", "deployed", self) @@ -892,6 +914,8 @@ def _deploy_call( explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ): """Helper method to deploy model to endpoint. @@ -965,6 +989,13 @@ def _deploy_call( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Raises: ValueError: If there is not current traffic split and traffic percentage is not 0 or 100. @@ -980,6 +1011,14 @@ def _deploy_call( "Both `accelerator_type` and `accelerator_count` should be specified or None." ) + if autoscaling_target_accelerator_duty_cycle is not None and ( + not accelerator_type or not accelerator_count + ): + raise ValueError( + "Both `accelerator_type` and `accelerator_count` should be set " + "when specifying autoscaling_target_accelerator_duty_cycle`" + ) + deployed_model = gca_endpoint_compat.DeployedModel( model=model.resource_name, display_name=deployed_model_display_name, @@ -995,7 +1034,11 @@ def _deploy_call( in model.supported_deployment_resources_types ) provided_custom_machine_spec = ( - machine_type or accelerator_type or accelerator_count + machine_type + or accelerator_type + or accelerator_count + or autoscaling_target_accelerator_duty_cycle + or autoscaling_target_cpu_utilization ) # If the model supports both automatic and dedicated deployment resources, @@ -1007,7 +1050,9 @@ def _deploy_call( if provided_custom_machine_spec and not use_dedicated_resources: _LOGGER.info( "Model does not support dedicated deployment resources. " - "The machine_type, accelerator_type and accelerator_count parameters are ignored." + "The machine_type, accelerator_type and accelerator_count," + "autoscaling_target_accelerator_duty_cycle," + "autoscaling_target_cpu_utilization parameters are ignored." ) if use_dedicated_resources and not machine_type: @@ -1015,22 +1060,41 @@ def _deploy_call( _LOGGER.info(f"Using default machine_type: {machine_type}") if use_dedicated_resources: + + dedicated_resources = gca_machine_resources_compat.DedicatedResources( + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + ) + machine_spec = gca_machine_resources_compat.MachineSpec( machine_type=machine_type ) + if autoscaling_target_cpu_utilization: + autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name="aiplatform.googleapis.com/prediction/online/cpu/utilization", + target=autoscaling_target_cpu_utilization, + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + if accelerator_type and accelerator_count: utils.validate_accelerator_type(accelerator_type) machine_spec.accelerator_type = accelerator_type machine_spec.accelerator_count = accelerator_count - deployed_model.dedicated_resources = ( - gca_machine_resources_compat.DedicatedResources( - machine_spec=machine_spec, - min_replica_count=min_replica_count, - max_replica_count=max_replica_count, - ) - ) + if autoscaling_target_accelerator_duty_cycle: + autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle", + target=autoscaling_target_accelerator_duty_cycle, + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + + dedicated_resources.machine_spec = machine_spec + deployed_model.dedicated_resources = dedicated_resources elif supports_automatic_resources: deployed_model.automatic_resources = ( @@ -1995,6 +2059,8 @@ def deploy( encryption_spec_key_name: Optional[str] = None, sync=True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> Endpoint: """Deploys model to endpoint. Endpoint will be created if unspecified. @@ -2079,6 +2145,13 @@ def deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Returns: endpoint ("Endpoint"): Endpoint with the deployed model. @@ -2113,6 +2186,8 @@ def deploy( or initializer.global_config.encryption_spec_key_name, sync=sync, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) @base.optional_sync(return_input_arg="endpoint", bind_future_to_self=False) @@ -2134,6 +2209,8 @@ def _deploy( encryption_spec_key_name: Optional[str] = None, sync: bool = True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> Endpoint: """Deploys model to endpoint. Endpoint will be created if unspecified. @@ -2218,6 +2295,13 @@ def _deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Returns: endpoint ("Endpoint"): Endpoint with the deployed model. @@ -2253,6 +2337,8 @@ def _deploy( explanation_parameters=explanation_parameters, metadata=metadata, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) _LOGGER.log_action_completed_against_resource("model", "deployed", endpoint) diff --git a/tests/unit/aiplatform/test_endpoints.py b/tests/unit/aiplatform/test_endpoints.py index 22e308e1f57..8908bcdce82 100644 --- a/tests/unit/aiplatform/test_endpoints.py +++ b/tests/unit/aiplatform/test_endpoints.py @@ -107,6 +107,13 @@ _TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_P100" _TEST_ACCELERATOR_COUNT = 2 +_TEST_METRIC_NAME_CPU_UTILIZATION = ( + "aiplatform.googleapis.com/prediction/online/cpu/utilization" +) +_TEST_METRIC_NAME_GPU_UTILIZATION = ( + "aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle" +) + _TEST_EXPLANATIONS = [gca_prediction_service.explanation.Explanation(attributions=[])] _TEST_ATTRIBUTIONS = [ @@ -1058,6 +1065,138 @@ def test_deploy_with_dedicated_resources(self, deploy_model_mock, sync): timeout=None, ) + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_autoscaling_target_cpu_utilization( + self, deploy_model_mock, sync + ): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint.deploy( + model=test_model, + machine_type=_TEST_MACHINE_TYPE, + service_account=_TEST_SERVICE_ACCOUNT, + sync=sync, + deploy_request_timeout=None, + autoscaling_target_cpu_utilization=70, + ) + + if not sync: + test_endpoint.wait() + + expected_machine_spec = gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + ) + + expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_CPU_UTILIZATION, + target=70, + ) + + expected_dedicated_resources = gca_machine_resources.DedicatedResources( + machine_spec=expected_machine_spec, + min_replica_count=1, + max_replica_count=1, + ) + expected_dedicated_resources.autoscaling_metric_specs.extend( + [expected_autoscaling_metric_spec] + ) + + expected_deployed_model = gca_endpoint.DeployedModel( + dedicated_resources=expected_dedicated_resources, + model=test_model.resource_name, + display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, + ) + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=expected_deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_autoscaling_target_accelerator_duty_cycle( + self, deploy_model_mock, sync + ): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint.deploy( + model=test_model, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + sync=sync, + deploy_request_timeout=None, + autoscaling_target_accelerator_duty_cycle=70, + ) + + if not sync: + test_endpoint.wait() + + expected_machine_spec = gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + ) + + expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_GPU_UTILIZATION, + target=70, + ) + + expected_dedicated_resources = gca_machine_resources.DedicatedResources( + machine_spec=expected_machine_spec, + min_replica_count=1, + max_replica_count=1, + ) + expected_dedicated_resources.autoscaling_metric_specs.extend( + [expected_autoscaling_metric_spec] + ) + + expected_deployed_model = gca_endpoint.DeployedModel( + dedicated_resources=expected_dedicated_resources, + model=test_model.resource_name, + display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, + ) + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=expected_deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_autoscaling_target_accelerator_duty_cycle_and_no_accelerator_type_or_count_raises( + self, sync + ): + with pytest.raises(ValueError): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint.deploy( + model=test_model, + sync=sync, + autoscaling_target_accelerator_duty_cycle=70, + ) + + if not sync: + test_endpoint.wait() + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") @pytest.mark.parametrize("sync", [True, False]) def test_deploy_with_explanations(self, deploy_model_with_explanations_mock, sync):