diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 1016b717bf..803ab4b159 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -643,6 +643,8 @@ def deploy( metadata: Optional[Sequence[Tuple[str, str]]] = (), sync=True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -716,6 +718,13 @@ def deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. """ self._sync_gca_resource_if_skipped() @@ -746,6 +755,8 @@ def deploy( metadata=metadata, sync=sync, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) @base.optional_sync() @@ -766,6 +777,8 @@ def _deploy( metadata: Optional[Sequence[Tuple[str, str]]] = (), sync=True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -839,6 +852,13 @@ def _deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Raises: ValueError: If there is not current traffic split and traffic percentage is not 0 or 100. @@ -865,6 +885,8 @@ def _deploy( explanation_parameters=explanation_parameters, metadata=metadata, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) _LOGGER.log_action_completed_against_resource("model", "deployed", self) @@ -891,6 +913,8 @@ def _deploy_call( explanation_parameters: Optional[explain.ExplanationParameters] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ): """Helper method to deploy model to endpoint. @@ -964,6 +988,13 @@ def _deploy_call( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Raises: ValueError: If there is not current traffic split and traffic percentage is not 0 or 100. @@ -979,6 +1010,14 @@ def _deploy_call( "Both `accelerator_type` and `accelerator_count` should be specified or None." ) + if autoscaling_target_accelerator_duty_cycle is not None and ( + not accelerator_type or not accelerator_count + ): + raise ValueError( + "Both `accelerator_type` and `accelerator_count` should be set " + "when specifying autoscaling_target_accelerator_duty_cycle`" + ) + deployed_model = gca_endpoint_compat.DeployedModel( model=model.resource_name, display_name=deployed_model_display_name, @@ -994,7 +1033,11 @@ def _deploy_call( in model.supported_deployment_resources_types ) provided_custom_machine_spec = ( - machine_type or accelerator_type or accelerator_count + machine_type + or accelerator_type + or accelerator_count + or autoscaling_target_accelerator_duty_cycle + or autoscaling_target_cpu_utilization ) # If the model supports both automatic and dedicated deployment resources, @@ -1006,7 +1049,9 @@ def _deploy_call( if provided_custom_machine_spec and not use_dedicated_resources: _LOGGER.info( "Model does not support dedicated deployment resources. " - "The machine_type, accelerator_type and accelerator_count parameters are ignored." + "The machine_type, accelerator_type and accelerator_count," + "autoscaling_target_accelerator_duty_cycle," + "autoscaling_target_cpu_utilization parameters are ignored." ) if use_dedicated_resources and not machine_type: @@ -1014,22 +1059,41 @@ def _deploy_call( _LOGGER.info(f"Using default machine_type: {machine_type}") if use_dedicated_resources: + + dedicated_resources = gca_machine_resources_compat.DedicatedResources( + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + ) + machine_spec = gca_machine_resources_compat.MachineSpec( machine_type=machine_type ) + if autoscaling_target_cpu_utilization: + autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name="aiplatform.googleapis.com/prediction/online/cpu/utilization", + target=autoscaling_target_cpu_utilization, + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + if accelerator_type and accelerator_count: utils.validate_accelerator_type(accelerator_type) machine_spec.accelerator_type = accelerator_type machine_spec.accelerator_count = accelerator_count - deployed_model.dedicated_resources = ( - gca_machine_resources_compat.DedicatedResources( - machine_spec=machine_spec, - min_replica_count=min_replica_count, - max_replica_count=max_replica_count, - ) - ) + if autoscaling_target_accelerator_duty_cycle: + autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec( + metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle", + target=autoscaling_target_accelerator_duty_cycle, + ) + dedicated_resources.autoscaling_metric_specs.extend( + [autoscaling_metric_spec] + ) + + dedicated_resources.machine_spec = machine_spec + deployed_model.dedicated_resources = dedicated_resources elif supports_automatic_resources: deployed_model.automatic_resources = ( @@ -1994,6 +2058,8 @@ def deploy( encryption_spec_key_name: Optional[str] = None, sync=True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> Endpoint: """Deploys model to endpoint. Endpoint will be created if unspecified. @@ -2078,6 +2144,13 @@ def deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Returns: endpoint ("Endpoint"): Endpoint with the deployed model. @@ -2112,6 +2185,8 @@ def deploy( or initializer.global_config.encryption_spec_key_name, sync=sync, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) @base.optional_sync(return_input_arg="endpoint", bind_future_to_self=False) @@ -2133,6 +2208,8 @@ def _deploy( encryption_spec_key_name: Optional[str] = None, sync: bool = True, deploy_request_timeout: Optional[float] = None, + autoscaling_target_cpu_utilization: Optional[int] = None, + autoscaling_target_accelerator_duty_cycle: Optional[int] = None, ) -> Endpoint: """Deploys model to endpoint. Endpoint will be created if unspecified. @@ -2217,6 +2294,13 @@ def _deploy( be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. + autoscaling_target_cpu_utilization (int): + Optional. Target CPU Utilization to use for Autoscaling Replicas. + A default value of 60 will be used if not specified. + autoscaling_target_accelerator_duty_cycle (int): + Optional. Target Accelerator Duty Cycle. + Must also set accelerator_type and accelerator_count if specified. + A default value of 60 will be used if not specified. Returns: endpoint ("Endpoint"): Endpoint with the deployed model. @@ -2252,6 +2336,8 @@ def _deploy( explanation_parameters=explanation_parameters, metadata=metadata, deploy_request_timeout=deploy_request_timeout, + autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, + autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle, ) _LOGGER.log_action_completed_against_resource("model", "deployed", endpoint) diff --git a/tests/unit/aiplatform/test_endpoints.py b/tests/unit/aiplatform/test_endpoints.py index 70e4f0d2b0..509eabb57c 100644 --- a/tests/unit/aiplatform/test_endpoints.py +++ b/tests/unit/aiplatform/test_endpoints.py @@ -103,6 +103,13 @@ _TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_P100" _TEST_ACCELERATOR_COUNT = 2 +_TEST_METRIC_NAME_CPU_UTILIZATION = ( + "aiplatform.googleapis.com/prediction/online/cpu/utilization" +) +_TEST_METRIC_NAME_GPU_UTILIZATION = ( + "aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle" +) + _TEST_EXPLANATIONS = [gca_prediction_service.explanation.Explanation(attributions=[])] _TEST_ATTRIBUTIONS = [ @@ -1054,6 +1061,138 @@ def test_deploy_with_dedicated_resources(self, deploy_model_mock, sync): timeout=None, ) + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_autoscaling_target_cpu_utilization( + self, deploy_model_mock, sync + ): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint.deploy( + model=test_model, + machine_type=_TEST_MACHINE_TYPE, + service_account=_TEST_SERVICE_ACCOUNT, + sync=sync, + deploy_request_timeout=None, + autoscaling_target_cpu_utilization=70, + ) + + if not sync: + test_endpoint.wait() + + expected_machine_spec = gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + ) + + expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_CPU_UTILIZATION, + target=70, + ) + + expected_dedicated_resources = gca_machine_resources.DedicatedResources( + machine_spec=expected_machine_spec, + min_replica_count=1, + max_replica_count=1, + ) + expected_dedicated_resources.autoscaling_metric_specs.extend( + [expected_autoscaling_metric_spec] + ) + + expected_deployed_model = gca_endpoint.DeployedModel( + dedicated_resources=expected_dedicated_resources, + model=test_model.resource_name, + display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, + ) + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=expected_deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_autoscaling_target_accelerator_duty_cycle( + self, deploy_model_mock, sync + ): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint.deploy( + model=test_model, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + sync=sync, + deploy_request_timeout=None, + autoscaling_target_accelerator_duty_cycle=70, + ) + + if not sync: + test_endpoint.wait() + + expected_machine_spec = gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + ) + + expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec( + metric_name=_TEST_METRIC_NAME_GPU_UTILIZATION, + target=70, + ) + + expected_dedicated_resources = gca_machine_resources.DedicatedResources( + machine_spec=expected_machine_spec, + min_replica_count=1, + max_replica_count=1, + ) + expected_dedicated_resources.autoscaling_metric_specs.extend( + [expected_autoscaling_metric_spec] + ) + + expected_deployed_model = gca_endpoint.DeployedModel( + dedicated_resources=expected_dedicated_resources, + model=test_model.resource_name, + display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, + ) + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=expected_deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_with_autoscaling_target_accelerator_duty_cycle_and_no_accelerator_type_or_count_raises( + self, sync + ): + with pytest.raises(ValueError): + test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint.deploy( + model=test_model, + sync=sync, + autoscaling_target_accelerator_duty_cycle=70, + ) + + if not sync: + test_endpoint.wait() + @pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock") @pytest.mark.parametrize("sync", [True, False]) def test_deploy_with_explanations(self, deploy_model_with_explanations_mock, sync):