From 572a27c7929e5686b61950e09e17134564987d50 Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Wed, 16 Jun 2021 19:16:54 -0700 Subject: [PATCH] feat: Expose additional attributes into Vertex SDK to close gap with GAPIC (#477) * Add most missing fields * Add tests for get trainingjob subclass * Drop Dataset len, add more attrs, update docstrings * flake8 lint * Address reviewer comments * Switch 'an' to 'a' when referencing Vertex AI * Address comments, move base attrs to subclasses * Drop unused import * Add test to ensure supported training schemas are always unique * Address reviewer comments --- google/cloud/aiplatform/base.py | 21 ++- google/cloud/aiplatform/initializer.py | 2 +- google/cloud/aiplatform/jobs.py | 83 ++++++++++- google/cloud/aiplatform/models.py | 130 ++++++++++++++++- google/cloud/aiplatform/training_jobs.py | 151 ++++++++++++++++---- tests/unit/aiplatform/test_models.py | 47 ++++++ tests/unit/aiplatform/test_training_jobs.py | 34 +++++ 7 files changed, 428 insertions(+), 40 deletions(-) diff --git a/google/cloud/aiplatform/base.py b/google/cloud/aiplatform/base.py index 07e4c2fe4a..732e6b9acf 100644 --- a/google/cloud/aiplatform/base.py +++ b/google/cloud/aiplatform/base.py @@ -42,7 +42,7 @@ from google.auth import credentials as auth_credentials from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils - +from google.cloud.aiplatform.compat.types import encryption_spec as gca_encryption_spec logging.basicConfig(level=logging.INFO, stream=sys.stdout) @@ -563,6 +563,23 @@ def update_time(self) -> datetime.datetime: self._sync_gca_resource() return self._gca_resource.update_time + @property + def encryption_spec(self) -> Optional[gca_encryption_spec.EncryptionSpec]: + """Customer-managed encryption key options for this Vertex AI resource. + + If this is set, then all resources created by this Vertex AI resource will + be encrypted with the provided encryption key. + """ + return getattr(self._gca_resource, "encryption_spec") + + @property + def labels(self) -> Dict[str, str]: + """User-defined labels containing metadata about this resource. + + Read more about labels at https://goo.gl/xmQnxf + """ + return self._gca_resource.labels + @property def gca_resource(self) -> proto.Message: """The underlying resource proto represenation.""" @@ -813,7 +830,7 @@ def _construct_sdk_resource_from_gapic( Args: gapic_resource (proto.Message): - A GAPIC representation of an Vertex AI resource, usually + A GAPIC representation of a Vertex AI resource, usually retrieved by a get_* or in a list_* API call. project (str): Optional. Project to construct SDK object from. If not set, diff --git a/google/cloud/aiplatform/initializer.py b/google/cloud/aiplatform/initializer.py index 4f57115fe7..ea1a51c8a7 100644 --- a/google/cloud/aiplatform/initializer.py +++ b/google/cloud/aiplatform/initializer.py @@ -267,7 +267,7 @@ def create_client( Args: client_class (utils.VertexAiServiceClientWithOverride): - (Required) An Vertex AI Service Client with optional overrides. + (Required) A Vertex AI Service Client with optional overrides. credentials (auth_credentials.Credentials): Custom auth credentials. If not provided will use the current config. location_override (str): Optional location override. diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 89ea52097d..376c6245ba 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -19,6 +19,7 @@ import abc import copy +import datetime import sys import time import logging @@ -28,6 +29,7 @@ from google.auth import credentials as auth_credentials from google.protobuf import duration_pb2 # type: ignore +from google.rpc import status_pb2 from google.cloud import aiplatform from google.cloud.aiplatform import base @@ -45,6 +47,7 @@ batch_prediction_job as gca_bp_job_compat, batch_prediction_job_v1 as gca_bp_job_v1, batch_prediction_job_v1beta1 as gca_bp_job_v1beta1, + completion_stats as gca_completion_stats, custom_job as gca_custom_job_compat, custom_job_v1beta1 as gca_custom_job_v1beta1, explanation_v1beta1 as gca_explanation_v1beta1, @@ -139,6 +142,27 @@ def state(self) -> gca_job_state.JobState: return self._gca_resource.state + @property + def start_time(self) -> Optional[datetime.datetime]: + """Time when the Job resource entered the `JOB_STATE_RUNNING` for the + first time.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "start_time") + + @property + def end_time(self) -> Optional[datetime.datetime]: + """Time when the Job resource entered the `JOB_STATE_SUCCEEDED`, + `JOB_STATE_FAILED`, or `JOB_STATE_CANCELLED` state.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "end_time") + + @property + def error(self) -> Optional[status_pb2.Status]: + """Detailed error info for this Job resource. Only populated when the + Job's state is `JOB_STATE_FAILED` or `JOB_STATE_CANCELLED`.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "error") + @property @abc.abstractmethod def _job_type(cls) -> str: @@ -302,6 +326,27 @@ def __init__( credentials=credentials, ) + @property + def output_info(self,) -> Optional[aiplatform.gapic.BatchPredictionJob.OutputInfo]: + """Information describing the output of this job, including output location + into which prediction output is written. + + This is only available for batch predicition jobs that have run successfully. + """ + return self._gca_resource.output_info + + @property + def partial_failures(self) -> Optional[Sequence[status_pb2.Status]]: + """Partial failures encountered. For example, single files that can't be read. + This field never exceeds 20 entries. Status details fields contain standard + GCP error details.""" + return getattr(self._gca_resource, "partial_failures") + + @property + def completion_stats(self) -> Optional[gca_completion_stats.CompletionStats]: + """Statistics on completed and failed prediction instances.""" + return getattr(self._gca_resource, "completion_stats") + @classmethod def create( cls, @@ -842,7 +887,7 @@ def get( location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, ) -> "_RunnableJob": - """Get an Vertex AI Job for the given resource_name. + """Get a Vertex AI Job for the given resource_name. Args: resource_name (str): @@ -858,7 +903,7 @@ def get( credentials set in aiplatform.init. Returns: - An Vertex AI Job. + A Vertex AI Job. """ self = cls._empty_constructor( project=project, @@ -887,7 +932,7 @@ class CustomJob(_RunnableJob): _resource_noun = "customJobs" _getter_method = "get_custom_job" - _list_method = "list_custom_job" + _list_method = "list_custom_jobs" _cancel_method = "cancel_custom_job" _delete_method = "delete_custom_job" _job_type = "training" @@ -987,6 +1032,20 @@ def __init__( ), ) + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + CustomJob should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the CustomJob is not peered with any network. + """ + return getattr(self._gca_resource, "network") + @classmethod def from_local_script( cls, @@ -1157,7 +1216,7 @@ def run( distributed training jobs that are not resilient to workers leaving and joining a job. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1444,6 +1503,20 @@ def __init__( ), ) + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + HyperparameterTuningJob should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the HyperparameterTuningJob is not peered with any network. + """ + return getattr(self._gca_resource.trial_job_spec, "network") + @base.optional_sync() def run( self, @@ -1473,7 +1546,7 @@ def run( distributed training jobs that are not resilient to workers leaving and joining a job. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index b93f569eaa..b287581431 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -18,8 +18,10 @@ from typing import Dict, List, NamedTuple, Optional, Sequence, Tuple, Union from google.api_core import operation +from google.api_core import exceptions as api_exceptions from google.auth import credentials as auth_credentials +from google.cloud import aiplatform from google.cloud.aiplatform import base from google.cloud.aiplatform import compat from google.cloud.aiplatform import explain @@ -119,6 +121,33 @@ def __init__( credentials=credentials, ) + @property + def traffic_split(self) -> Dict[str, int]: + """A map from a DeployedModel's ID to the percentage of this Endpoint's + traffic that should be forwarded to that DeployedModel. + + If a DeployedModel's ID is not listed in this map, then it receives no traffic. + + The traffic percentage values must add up to 100, or map must be empty if + the Endpoint is to not accept any traffic at a moment. + """ + self._sync_gca_resource() + return dict(self._gca_resource.traffic_split) + + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + Endpoint should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the Endpoint is not peered with any network. + """ + return getattr(self._gca_resource, "network") + @classmethod def create( cls, @@ -1211,12 +1240,13 @@ class Model(base.VertexAiResourceNounWithFutureManager): _delete_method = "delete_model" @property - def uri(self): - """Uri of the model.""" - return self._gca_resource.artifact_uri + def uri(self) -> Optional[str]: + """Path to the directory containing the Model artifact and any of its + supporting files. Not present for AutoML Models.""" + return self._gca_resource.artifact_uri or None @property - def description(self): + def description(self) -> str: """Description of the model.""" return self._gca_resource.description @@ -1240,6 +1270,98 @@ def supported_export_formats( for export_format in self._gca_resource.supported_export_formats } + @property + def supported_deployment_resources_types( + self, + ) -> List[aiplatform.gapic.Model.DeploymentResourcesType]: + """List of deployment resource types accepted for this Model. + + When this Model is deployed, its prediction resources are described by + the `prediction_resources` field of the objects returned by + `Endpoint.list_models()`. Because not all Models support all resource + configuration types, the configuration types this Model supports are + listed here. + + If no configuration types are listed, the Model cannot be + deployed to an `Endpoint` and does not support online predictions + (`Endpoint.predict()` or `Endpoint.explain()`). Such a Model can serve + predictions by using a `BatchPredictionJob`, if it has at least one entry + each in `Model.supported_input_storage_formats` and + `Model.supported_output_storage_formats`.""" + return list(self._gca_resource.supported_deployment_resources_types) + + @property + def supported_input_storage_formats(self) -> List[str]: + """The formats this Model supports in the `input_config` field of a + `BatchPredictionJob`. If `Model.predict_schemata.instance_schema_uri` + exists, the instances should be given as per that schema. + + [Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions#batch_request_input) + + If this Model doesn't support any of these formats it means it cannot be + used with a `BatchPredictionJob`. However, if it has + `supported_deployment_resources_types`, it could serve online predictions + by using `Endpoint.predict()` or `Endpoint.explain()`. + """ + return list(self._gca_resource.supported_input_storage_formats) + + @property + def supported_output_storage_formats(self) -> List[str]: + """The formats this Model supports in the `output_config` field of a + `BatchPredictionJob`. + + If both `Model.predict_schemata.instance_schema_uri` and + `Model.predict_schemata.prediction_schema_uri` exist, the predictions + are returned together with their instances. In other words, the + prediction has the original instance data first, followed by the actual + prediction content (as per the schema). + + [Read the docs for more on batch prediction formats](https://cloud.google.com/vertex-ai/docs/predictions/batch-predictions) + + If this Model doesn't support any of these formats it means it cannot be + used with a `BatchPredictionJob`. However, if it has + `supported_deployment_resources_types`, it could serve online predictions + by using `Endpoint.predict()` or `Endpoint.explain()`. + """ + return list(self._gca_resource.supported_output_storage_formats) + + @property + def predict_schemata(self) -> Optional[aiplatform.gapic.PredictSchemata]: + """The schemata that describe formats of the Model's predictions and + explanations, if available.""" + return getattr(self._gca_resource, "predict_schemata") + + @property + def training_job(self) -> Optional["aiplatform.training_jobs._TrainingJob"]: + """The TrainingJob that uploaded this Model, if any. + + Raises: + api_core.exceptions.NotFound: If the Model's training job resource + cannot be found on the Vertex service. + """ + job_name = getattr(self._gca_resource, "training_pipeline") + + if not job_name: + return None + + try: + return aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=job_name, + project=self.project, + location=self.location, + credentials=self.credentials, + ) + except api_exceptions.NotFound: + raise api_exceptions.NotFound( + f"The training job used to create this model could not be found: {job_name}" + ) + + @property + def container_spec(self) -> Optional[aiplatform.gapic.ModelContainerSpec]: + """The specification of the container that is to be used when deploying + this Model. Not present for AutoML Models.""" + return getattr(self._gca_resource, "container_spec") + def __init__( self, model_name: str, diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index cf33b23ba4..99f4f088a5 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime import time from typing import Dict, List, Optional, Sequence, Tuple, Union @@ -46,6 +47,7 @@ ) from google.rpc import code_pb2 +from google.rpc import status_pb2 import proto @@ -136,9 +138,30 @@ def __init__( @abc.abstractmethod def _supported_training_schemas(cls) -> Tuple[str]: """List of supported schemas for this training job.""" - pass + @property + def start_time(self) -> Optional[datetime.datetime]: + """Time when the TrainingJob entered the `PIPELINE_STATE_RUNNING` for + the first time.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "start_time") + + @property + def end_time(self) -> Optional[datetime.datetime]: + """Time when the TrainingJob resource entered the `PIPELINE_STATE_SUCCEEDED`, + `PIPELINE_STATE_FAILED`, `PIPELINE_STATE_CANCELLED` state.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "end_time") + + @property + def error(self) -> Optional[status_pb2.Status]: + """Detailed error info for this TrainingJob resource. Only populated when + the TrainingJob's state is `PIPELINE_STATE_FAILED` or + `PIPELINE_STATE_CANCELLED`.""" + self._sync_gca_resource() + return getattr(self._gca_resource, "error") + @classmethod def get( cls, @@ -153,10 +176,10 @@ def get( resource_name (str): Required. A fully-qualified resource name or ID. project (str): - Optional project to retrieve dataset from. If not set, project + Optional project to retrieve training job from. If not set, project set in aiplatform.init will be used. location (str): - Optional location to retrieve dataset from. If not set, location + Optional location to retrieve training job from. If not set, location set in aiplatform.init will be used. credentials (auth_credentials.Credentials): Custom credentials to use to upload this model. Overrides @@ -167,7 +190,7 @@ def get( doesn't match the custom training task definition. Returns: - An Vertex AI Training Job + A Vertex AI Training Job """ # Create job with dummy parameters @@ -194,11 +217,68 @@ def get( return self + @classmethod + def _get_and_return_subclass( + cls, + resource_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> "_TrainingJob": + """Retrieve Training Job subclass for the given resource_name without + knowing the training_task_definition. + + Example usage: + ``` + aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + 'projects/.../locations/.../trainingPipelines/12345' + ) + # Returns: + ``` + + Args: + resource_name (str): + Required. A fully-qualified resource name or ID. + project (str): + Optional project to retrieve dataset from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional location to retrieve dataset from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + + Returns: + A Vertex AI Training Job + """ + + # Retrieve training pipeline resource before class construction + client = cls._instantiate_client(location=location, credentials=credentials) + + gca_training_pipeline = getattr(client, cls._getter_method)(name=resource_name) + + schema_uri = gca_training_pipeline.training_task_definition + + # Collect all AutoML training job classes and CustomTrainingJob + class_list = [ + c for c in cls.__subclasses__() if c.__name__.startswith("AutoML") + ] + [CustomTrainingJob] + + # Identify correct training job subclass, construct and return object + for c in class_list: + if schema_uri in c._supported_training_schemas: + return c._empty_constructor( + project=project, + location=location, + credentials=credentials, + resource_name=resource_name, + ) + @property @abc.abstractmethod def _model_upload_fail_string(self) -> str: """Helper property for model upload failure.""" - pass @abc.abstractmethod @@ -595,7 +675,7 @@ def _get_model(self) -> Optional[models.Model]: """Helper method to get and instantiate the Model to Upload. Returns: - model: Vertex AI Model if training succeeded and produced an Vertex AI + model: Vertex AI Model if training succeeded and produced a Vertex AI Model. None otherwise. Raises: @@ -1009,6 +1089,21 @@ def __init__( # this flags keeps that state so we don't log it multiple times self._has_logged_custom_job = False + @property + def network(self) -> Optional[str]: + """The full name of the Google Compute Engine + [network](https://cloud.google.com/vpc/docs/vpc#networks) to which this + CustomTrainingJob should be peered. + + Takes the format `projects/{project}/global/networks/{network}`. Where + {project} is a project number, as in `12345`, and {network} is a network name. + + Private services access must already be configured for the network. If left + unspecified, the CustomTrainingJob is not peered with any network. + """ + # Return `network` value in training task inputs if set in Map + return self._gca_resource.training_task_inputs.get("network") + def _prepare_and_validate_run( self, model_display_name: Optional[str] = None, @@ -1107,7 +1202,7 @@ def _prepare_training_task_inputs_and_output_dir( Private services access must already be configured for the network. If left unspecified, the job is not peered with any network. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1575,7 +1670,7 @@ def run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1596,7 +1691,7 @@ def run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ worker_pool_specs, managed_model = self._prepare_and_validate_run( model_display_name=model_display_name, @@ -1745,7 +1840,7 @@ def _run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -1766,7 +1861,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ package_gcs_uri = python_packager.package_and_copy_to_gcs( gcs_staging_dir=self._staging_bucket, @@ -2190,7 +2285,7 @@ def run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -2211,7 +2306,7 @@ def run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run, staging_bucket has not @@ -2354,7 +2449,7 @@ def _run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -2375,7 +2470,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ for spec in worker_pool_specs: @@ -2641,7 +2736,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run or is waiting to run. @@ -2759,7 +2854,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ training_task_definition = schema.training_job.definition.automl_tabular @@ -3043,7 +3138,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError if Training job has already been run or is waiting to run. @@ -3228,7 +3323,7 @@ def _run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ training_task_definition = schema.training_job.definition.automl_forecasting @@ -3522,7 +3617,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run or is waiting to run. @@ -3623,7 +3718,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ # Retrieve the objective-specific training task schema based on prediction_type @@ -4059,7 +4154,7 @@ def run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -4080,7 +4175,7 @@ def run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ worker_pool_specs, managed_model = self._prepare_and_validate_run( model_display_name=model_display_name, @@ -4205,7 +4300,7 @@ def _run( Supported only for tabular and time series Datasets. tensorboard (str): - Optional. The name of an Vertex AI + Optional. The name of a Vertex AI [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] resource to which this CustomJob will upload Tensorboard logs. Format: @@ -4226,7 +4321,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ for spec in worker_pool_specs: spec["python_package_spec"] = { @@ -4433,7 +4528,7 @@ def run( be immediately returned and synced when the Future has completed. Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. Raises: RuntimeError: If Training job has already been run or is waiting to run. @@ -4497,7 +4592,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ # Retrieve the objective-specific training task schema based on prediction_type @@ -4775,7 +4870,7 @@ def _run( Returns: model: The trained Vertex AI Model resource or None if training did not - produce an Vertex AI Model. + produce a Vertex AI Model. """ if model_display_name is None: diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index ad84fde65b..be4f7f61bd 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -19,8 +19,10 @@ from concurrent import futures import pytest from unittest import mock +from unittest.mock import patch from google.api_core import operation as ga_operation +from google.api_core import exceptions as api_exceptions from google.auth import credentials as auth_credentials from google.cloud import aiplatform @@ -58,6 +60,7 @@ from google.cloud.aiplatform_v1.services.model_service import ( client as model_service_client, ) +from google.cloud.aiplatform.compat.services import pipeline_service_client from google.cloud.aiplatform_v1.types import ( batch_prediction_job as gca_batch_prediction_job, io as gca_io, @@ -100,6 +103,10 @@ _TEST_STARTING_REPLICA_COUNT = 2 _TEST_MAX_REPLICA_COUNT = 12 +_TEST_PIPELINE_RESOURCE_NAME = ( + "projects/my-project/locations/us-central1/trainingPipeline/12345" +) + _TEST_BATCH_PREDICTION_GCS_SOURCE = "gs://example-bucket/folder/instance.jsonl" _TEST_BATCH_PREDICTION_GCS_SOURCE_LIST = [ "gs://example-bucket/folder/instance1.jsonl", @@ -252,6 +259,19 @@ def get_model_with_custom_project_mock(): yield get_model_mock +@pytest.fixture +def get_model_with_training_job(): + with mock.patch.object( + model_service_client.ModelServiceClient, "get_model" + ) as get_model_mock: + get_model_mock.return_value = gca_model.Model( + display_name=_TEST_MODEL_NAME, + name=_TEST_MODEL_RESOURCE_NAME_CUSTOM_PROJECT, + training_pipeline=_TEST_PIPELINE_RESOURCE_NAME, + ) + yield get_model_mock + + @pytest.fixture def get_model_with_supported_export_formats_image(): with mock.patch.object( @@ -457,6 +477,16 @@ def create_batch_prediction_job_with_explanations_mock(): yield create_batch_prediction_job_mock +@pytest.fixture +def get_training_job_non_existent_mock(): + with patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as get_training_job_non_existent_mock: + get_training_job_non_existent_mock.side_effect = api_exceptions.NotFound("404") + + yield get_training_job_non_existent_mock + + @pytest.fixture def create_client_mock(): with mock.patch.object( @@ -1384,3 +1414,20 @@ def test_export_model_as_artifact_with_invalid_args(self, export_model_mock, syn assert e.match( regexp=r"This model can not be exported as a container image." ) + + @pytest.mark.usefixtures( + "get_training_job_non_existent_mock", "get_model_with_training_job" + ) + def test_get_and_return_subclass_not_found(self): + test_model = models.Model(_TEST_ID) + + # Attempt to access Model's training job that no longer exists + with pytest.raises(api_exceptions.NotFound) as e: + test_model.training_job + + assert e.match( + regexp=( + r"The training job used to create this model could not be found: " + fr"{_TEST_PIPELINE_RESOURCE_NAME}" + ) + ) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index cd9a6a4033..b160204d7d 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -1550,6 +1550,40 @@ def test_get_training_job_with_project_and_alt_location(self): location=_TEST_ALT_LOCATION, ) + def test_unique_supported_training_schemas(self): + """Ensure that the `_supported_training_schemas` across AutoML training + classes and CustomTrainingJob contain unique values.""" + + schemas = [ + schema + for c in aiplatform.training_jobs._TrainingJob.__subclasses__() + for schema in c._supported_training_schemas + if c.__name__.startswith("AutoML") + ] + + schemas.extend( + aiplatform.training_jobs.CustomTrainingJob._supported_training_schemas + ) + + # Ensure all schemas across classes are unique + assert len(set(schemas)) == len(schemas) + + @pytest.mark.usefixtures("get_training_job_tabular_mock") + def test_get_and_return_subclass_automl(self): + subcls = aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=_TEST_PIPELINE_RESOURCE_NAME + ) + + assert isinstance(subcls, aiplatform.training_jobs.AutoMLTabularTrainingJob) + + @pytest.mark.usefixtures("get_training_job_custom_mock") + def test_get_and_return_subclass_custom(self): + subcls = aiplatform.training_jobs._TrainingJob._get_and_return_subclass( + resource_name=_TEST_PIPELINE_RESOURCE_NAME + ) + + assert isinstance(subcls, aiplatform.training_jobs.CustomTrainingJob) + @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset( self,