diff --git a/google/cloud/aiplatform/preview/jobs.py b/google/cloud/aiplatform/preview/jobs.py index 7ba408db95..22838207ea 100644 --- a/google/cloud/aiplatform/preview/jobs.py +++ b/google/cloud/aiplatform/preview/jobs.py @@ -28,6 +28,8 @@ from google.cloud.aiplatform import utils from google.cloud.aiplatform.compat.types import ( custom_job_v1beta1 as gca_custom_job_compat, + job_state as gca_job_state, + job_state_v1beta1 as gca_job_state_v1beta1, ) from google.cloud.aiplatform.compat.types import ( execution_v1beta1 as gcs_execution_compat, @@ -42,6 +44,24 @@ _LOGGER = base.Logger(__name__) _DEFAULT_RETRY = retry.Retry() +# TODO(b/242108750): remove temporary logic once model monitoring for batch prediction is GA +_JOB_COMPLETE_STATES = ( + gca_job_state.JobState.JOB_STATE_SUCCEEDED, + gca_job_state.JobState.JOB_STATE_FAILED, + gca_job_state.JobState.JOB_STATE_CANCELLED, + gca_job_state.JobState.JOB_STATE_PAUSED, + gca_job_state_v1beta1.JobState.JOB_STATE_SUCCEEDED, + gca_job_state_v1beta1.JobState.JOB_STATE_FAILED, + gca_job_state_v1beta1.JobState.JOB_STATE_CANCELLED, + gca_job_state_v1beta1.JobState.JOB_STATE_PAUSED, +) + +_JOB_ERROR_STATES = ( + gca_job_state.JobState.JOB_STATE_FAILED, + gca_job_state.JobState.JOB_STATE_CANCELLED, + gca_job_state_v1beta1.JobState.JOB_STATE_FAILED, + gca_job_state_v1beta1.JobState.JOB_STATE_CANCELLED, +) class CustomJob(jobs.CustomJob): diff --git a/google/cloud/aiplatform/preview/resource_pool_utils.py b/google/cloud/aiplatform/preview/resource_pool_utils.py new file mode 100644 index 0000000000..1a60783b5d --- /dev/null +++ b/google/cloud/aiplatform/preview/resource_pool_utils.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import NamedTuple, Optional, Dict, Union + +from google.cloud.aiplatform import utils +from google.cloud.aiplatform.compat.types import ( + accelerator_type_v1beta1 as gca_accelerator_type_compat, +) + + +class _ResourcePool(NamedTuple): + """Specification container for Worker Pool specs used for distributed training. + + Usage: + + resource_pool = _ResourcePool( + replica_count=1, + machine_type='n1-standard-4', + accelerator_count=1, + accelerator_type='NVIDIA_TESLA_K80', + boot_disk_type='pd-ssd', + boot_disk_size_gb=100, + ) + + Note that container and python package specs are not stored with this spec. + """ + + replica_count: int = 1 + machine_type: str = "n1-standard-4" + accelerator_count: int = 0 + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" + boot_disk_type: str = "pd-ssd" + boot_disk_size_gb: int = 100 + + def _get_accelerator_type(self) -> Optional[str]: + """Validates accelerator_type and returns the name of the accelerator. + + Returns: + None if no accelerator or valid accelerator name. + + Raise: + ValueError if accelerator type is invalid. + """ + + # Raises ValueError if invalid accelerator_type + utils.validate_accelerator_type(self.accelerator_type) + + accelerator_enum = getattr( + gca_accelerator_type_compat.AcceleratorType, self.accelerator_type + ) + + if ( + accelerator_enum + != gca_accelerator_type_compat.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED + ): + return self.accelerator_type + + @property + def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]: + """Return specification as a Dict.""" + spec = { + "machine_spec": {"machine_type": self.machine_type}, + "replica_count": self.replica_count, + "disk_spec": { + "boot_disk_type": self.boot_disk_type, + "boot_disk_size_gb": self.boot_disk_size_gb, + }, + } + + accelerator_type = self._get_accelerator_type() + if accelerator_type and self.accelerator_count: + spec["machine_spec"]["accelerator_type"] = accelerator_type + spec["machine_spec"]["accelerator_count"] = self.accelerator_count + + return spec + + @property + def is_empty(self) -> bool: + """Returns True is replica_count > 0 False otherwise.""" + return self.replica_count <= 0 diff --git a/tests/unit/vertexai/conftest.py b/tests/unit/vertexai/conftest.py index 4994248eed..538f3e7cf9 100644 --- a/tests/unit/vertexai/conftest.py +++ b/tests/unit/vertexai/conftest.py @@ -38,10 +38,13 @@ from google.cloud.aiplatform_v1beta1.services.persistent_resource_service import ( PersistentResourceServiceClient, ) -import constants as test_constants from pyfakefs import fake_filesystem_unittest import pytest import tensorflow.saved_model as tf_saved_model +from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( + PersistentResource, + ResourcePool, +) _TEST_PROJECT = "test-project" @@ -83,6 +86,18 @@ labels={"trained_by_vertex_ai": "true"}, ) +_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource() +resource_pool = ResourcePool() +resource_pool.machine_spec.machine_type = "n1-standard-4" +resource_pool.replica_count = 1 +resource_pool.disk_spec.boot_disk_type = "pd-ssd" +resource_pool.disk_spec.boot_disk_size_gb = 100 +_TEST_REQUEST_RUNNING_DEFAULT.resource_pools = [resource_pool] + + +_TEST_PERSISTENT_RESOURCE_RUNNING = PersistentResource() +_TEST_PERSISTENT_RESOURCE_RUNNING.state = "RUNNING" + @pytest.fixture(scope="module") def google_auth_mock(): @@ -264,7 +279,7 @@ def persistent_resource_running_mock(): "get_persistent_resource", ) as persistent_resource_running_mock: persistent_resource_running_mock.return_value = ( - test_constants._TEST_PERSISTENT_RESOURCE_RUNNING + _TEST_PERSISTENT_RESOURCE_RUNNING ) yield persistent_resource_running_mock @@ -287,7 +302,7 @@ def create_persistent_resource_default_mock(): ) as create_persistent_resource_default_mock: create_persistent_resource_lro_mock = mock.Mock(ga_operation.Operation) create_persistent_resource_lro_mock.result.return_value = ( - test_constants._TEST_REQUEST_RUNNING_DEFAULT + _TEST_REQUEST_RUNNING_DEFAULT ) create_persistent_resource_default_mock.return_value = ( create_persistent_resource_lro_mock diff --git a/tests/unit/vertexai/constants.py b/tests/unit/vertexai/constants.py deleted file mode 100644 index deb469f3d7..0000000000 --- a/tests/unit/vertexai/constants.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from vertexai.preview._workflow.shared import configs -from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( - PersistentResource, - ResourcePool, -) - -_TEST_PROJECT = "test-project" -_TEST_LOCATION = "us-central1" -_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" -_TEST_CLUSTER_NAME = "test-cluster" -_TEST_CLUSTER_CONFIG = configs.PersistentResourceConfig(name=_TEST_CLUSTER_NAME) -_TEST_CLUSTER_RESOURCE_NAME = f"{_TEST_PARENT}/persistentResources/{_TEST_CLUSTER_NAME}" - - -_TEST_PERSISTENT_RESOURCE_ERROR = PersistentResource() -_TEST_PERSISTENT_RESOURCE_ERROR.state = "ERROR" - -# move to constants -_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource() -resource_pool = ResourcePool() -resource_pool.machine_spec.machine_type = "n1-standard-4" -resource_pool.replica_count = 1 -resource_pool.disk_spec.boot_disk_type = "pd-ssd" -resource_pool.disk_spec.boot_disk_size_gb = 100 -_TEST_REQUEST_RUNNING_DEFAULT.resource_pools = [resource_pool] - - -_TEST_PERSISTENT_RESOURCE_RUNNING = PersistentResource() -_TEST_PERSISTENT_RESOURCE_RUNNING.state = "RUNNING" diff --git a/tests/unit/vertexai/test_persistent_resource_util.py b/tests/unit/vertexai/test_persistent_resource_util.py index 109b675f35..b5e0635d32 100644 --- a/tests/unit/vertexai/test_persistent_resource_util.py +++ b/tests/unit/vertexai/test_persistent_resource_util.py @@ -19,10 +19,15 @@ from google.api_core import operation as ga_operation from google.cloud import aiplatform import vertexai +from vertexai.preview.developer import remote_specs from google.cloud.aiplatform_v1beta1.services.persistent_resource_service import ( PersistentResourceServiceClient, ) from google.cloud.aiplatform_v1beta1.types import persistent_resource_service +from google.cloud.aiplatform_v1beta1.types.machine_resources import DiskSpec +from google.cloud.aiplatform_v1beta1.types.machine_resources import ( + MachineSpec, +) from google.cloud.aiplatform_v1beta1.types.persistent_resource import ( PersistentResource, ) @@ -48,55 +53,64 @@ _TEST_PERSISTENT_RESOURCE_ERROR = PersistentResource() _TEST_PERSISTENT_RESOURCE_ERROR.state = "ERROR" -_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource() -resource_pool = ResourcePool() -resource_pool.machine_spec.machine_type = "n1-standard-4" -resource_pool.replica_count = 1 -resource_pool.disk_spec.boot_disk_type = "pd-ssd" -resource_pool.disk_spec.boot_disk_size_gb = 100 -_TEST_REQUEST_RUNNING_DEFAULT.resource_pools = [resource_pool] - +resource_pool_0 = ResourcePool( + machine_spec=MachineSpec(machine_type="n1-standard-4"), + disk_spec=DiskSpec( + boot_disk_type="pd-ssd", + boot_disk_size_gb=100, + ), + replica_count=1, +) +resource_pool_1 = ResourcePool( + machine_spec=MachineSpec( + machine_type="n1-standard-8", + accelerator_type="NVIDIA_TESLA_T4", + accelerator_count=1, + ), + disk_spec=DiskSpec( + boot_disk_type="pd-ssd", + boot_disk_size_gb=100, + ), + replica_count=2, +) +_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource( + resource_pools=[resource_pool_0], +) +_TEST_REQUEST_RUNNING_CUSTOM = PersistentResource( + resource_pools=[resource_pool_0, resource_pool_1], +) _TEST_PERSISTENT_RESOURCE_RUNNING = PersistentResource() _TEST_PERSISTENT_RESOURCE_RUNNING.state = "RUNNING" - -@pytest.fixture -def persistent_resource_running_mock(): - with mock.patch.object( - PersistentResourceServiceClient, - "get_persistent_resource", - ) as persistent_resource_running_mock: - persistent_resource_running_mock.return_value = ( - _TEST_PERSISTENT_RESOURCE_RUNNING - ) - yield persistent_resource_running_mock - - -@pytest.fixture -def persistent_resource_exception_mock(): - with mock.patch.object( - PersistentResourceServiceClient, - "get_persistent_resource", - ) as persistent_resource_exception_mock: - persistent_resource_exception_mock.side_effect = Exception - yield persistent_resource_exception_mock +# user-configured remote_specs.ResourcePool +remote_specs_resource_pool_0 = remote_specs.ResourcePool(replica_count=1) +remote_specs_resource_pool_1 = remote_specs.ResourcePool( + machine_type="n1-standard-8", + replica_count=2, + accelerator_type="NVIDIA_TESLA_T4", + accelerator_count=1, +) +_TEST_CUSTOM_RESOURCE_POOLS = [ + remote_specs_resource_pool_0, + remote_specs_resource_pool_1, +] @pytest.fixture -def create_persistent_resource_default_mock(): +def create_persistent_resource_custom_mock(): with mock.patch.object( PersistentResourceServiceClient, "create_persistent_resource", - ) as create_persistent_resource_default_mock: + ) as create_persistent_resource_custom_mock: create_persistent_resource_lro_mock = mock.Mock(ga_operation.Operation) create_persistent_resource_lro_mock.result.return_value = ( - _TEST_REQUEST_RUNNING_DEFAULT + _TEST_REQUEST_RUNNING_CUSTOM ) - create_persistent_resource_default_mock.return_value = ( + create_persistent_resource_custom_mock.return_value = ( create_persistent_resource_lro_mock ) - yield create_persistent_resource_default_mock + yield create_persistent_resource_custom_mock @pytest.fixture @@ -180,6 +194,25 @@ def test_create_persistent_resource_default_success( request, ) + @pytest.mark.usefixtures("persistent_resource_running_mock") + def test_create_persistent_resource_custom_success( + self, create_persistent_resource_custom_mock + ): + persistent_resource_util.create_persistent_resource( + cluster_resource_name=_TEST_CLUSTER_RESOURCE_NAME, + resource_pools=_TEST_CUSTOM_RESOURCE_POOLS, + ) + + request = persistent_resource_service.CreatePersistentResourceRequest( + parent=_TEST_PARENT, + persistent_resource=_TEST_REQUEST_RUNNING_CUSTOM, + persistent_resource_id=_TEST_CLUSTER_NAME, + ) + + create_persistent_resource_custom_mock.assert_called_with( + request, + ) + @pytest.mark.usefixtures("create_persistent_resource_exception_mock") def test_create_ray_cluster_state_error(self): with pytest.raises(ValueError) as e: diff --git a/tests/unit/vertexai/test_remote_training.py b/tests/unit/vertexai/test_remote_training.py index 218ae74ef1..914b86d839 100644 --- a/tests/unit/vertexai/test_remote_training.py +++ b/tests/unit/vertexai/test_remote_training.py @@ -26,19 +26,24 @@ from google.api_core import exceptions from google.cloud import aiplatform from google.cloud.aiplatform import utils -from google.cloud.aiplatform.compat.services import job_service_client +from google.cloud.aiplatform.compat.services import ( + job_service_client_v1beta1 as job_service_client, +) +from google.cloud.aiplatform.compat.types import ( + custom_job_v1beta1 as gca_custom_job_compat, +) from google.cloud.aiplatform.compat.types import ( - custom_job as gca_custom_job_compat, + execution_v1beta1 as gca_execution, ) -from google.cloud.aiplatform.compat.types import execution as gca_execution -from google.cloud.aiplatform.compat.types import io as gca_io_compat +from google.cloud.aiplatform.compat.types import io_v1beta1 as gca_io_compat from google.cloud.aiplatform.compat.types import ( - job_state as gca_job_state_compat, + job_state_v1beta1 as gca_job_state_compat, ) from google.cloud.aiplatform.compat.types import ( tensorboard as gca_tensorboard, ) from google.cloud.aiplatform.metadata import constants as metadata_constants +from google.cloud.aiplatform.preview import resource_pool_utils from google.cloud.aiplatform_v1 import ( Context as GapicContext, MetadataServiceClient, @@ -135,13 +140,16 @@ + _TEST_TRAINING_COMMAND + " --enable_autolog" ) - +_TEST_REPLICA_COUNT = 1 +_TEST_MACHINE_TYPE = "n1-standard-4" +_TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_K80" +_TEST_ACCELERATOR_COUNT = 2 _TEST_WORKER_POOL_SPEC = [ { "machine_spec": { - "machine_type": "n1-standard-4", + "machine_type": _TEST_MACHINE_TYPE, }, - "replica_count": 1, + "replica_count": _TEST_REPLICA_COUNT, "container_spec": { "image_uri": f"python:{supported_frameworks._get_python_minor_version()}", "command": ["sh", "-c"] @@ -246,6 +254,20 @@ is_default=True, ) +_TEST_PERSISTENT_RESOURCE_ID = "test-cluster" +_TEST_PERSISTENT_RESOURCE_CONFIG = configs.PersistentResourceConfig( + name=_TEST_PERSISTENT_RESOURCE_ID, + resource_pools=[ + remote_specs.ResourcePool( + replica_count=_TEST_REPLICA_COUNT, + ), + remote_specs.ResourcePool( + machine_type="n1-standard-8", + replica_count=2, + ), + ], +) + @pytest.fixture def list_default_tensorboard_mock(): @@ -277,6 +299,7 @@ def _get_custom_job_proto( model=None, user_requirements=False, custom_commands=False, + persistent_resource_id=None, ): job = copy.deepcopy(_TEST_CUSTOM_JOB_PROTO) if display_name: @@ -388,6 +411,8 @@ def _get_custom_job_proto( env.append( {"name": metadata_constants.ENV_EXPERIMENT_RUN_KEY, "value": experiment_run} ) + if persistent_resource_id: + job.job_spec.persistent_resource_id = persistent_resource_id job.labels = ({"trained_by_vertex_ai": "true"},) return job @@ -1738,3 +1763,126 @@ def test_get_service_account_empty_sa_autolog_disabled(self): service_account = training._get_service_account(config, autolog=False) assert service_account is None + + @pytest.mark.usefixtures( + "mock_timestamped_unique_name", + "mock_get_custom_job", + "mock_autolog_disabled", + "persistent_resource_running_mock", + ) + def test_remote_training_sklearn_with_persistent_cluster( + self, + mock_any_serializer_sklearn, + mock_create_custom_job, + ): + vertexai.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_BUCKET_NAME, + ) + vertexai.preview.init(remote=True, cluster=_TEST_PERSISTENT_RESOURCE_CONFIG) + + LogisticRegression = vertexai.preview.remote(_logistic.LogisticRegression) + model = LogisticRegression() + + model.fit(_X_TRAIN, _Y_TRAIN) + + # check that model is serialized correctly + mock_any_serializer_sklearn.return_value.serialize.assert_any_call( + to_serialize=model, + gcs_path=os.path.join(_TEST_REMOTE_JOB_BASE_PATH, "input/input_estimator"), + ) + + # check that args are serialized correctly + mock_any_serializer_sklearn.return_value.serialize.assert_any_call( + to_serialize=_X_TRAIN, + gcs_path=os.path.join(_TEST_REMOTE_JOB_BASE_PATH, "input/X"), + ) + mock_any_serializer_sklearn.return_value.serialize.assert_any_call( + to_serialize=_Y_TRAIN, + gcs_path=os.path.join(_TEST_REMOTE_JOB_BASE_PATH, "input/y"), + ) + + # ckeck that CustomJob is created correctly + expected_custom_job = _get_custom_job_proto( + persistent_resource_id=_TEST_PERSISTENT_RESOURCE_ID, + ) + mock_create_custom_job.assert_called_once_with( + parent=_TEST_PARENT, + custom_job=expected_custom_job, + timeout=None, + ) + + # check that trained model is deserialized correctly + mock_any_serializer_sklearn.return_value.deserialize.assert_has_calls( + [ + mock.call( + os.path.join(_TEST_REMOTE_JOB_BASE_PATH, "output/output_estimator") + ), + mock.call( + os.path.join(_TEST_REMOTE_JOB_BASE_PATH, "output/output_data") + ), + ] + ) + + # change to `vertexai.preview.init(remote=False)` to use local prediction + vertexai.preview.init(remote=False) + + # check that local model is updated in place + # `model.score` raises NotFittedError if the model is not updated + model.score(_X_TEST, _Y_TEST) + + @pytest.mark.usefixtures( + "list_default_tensorboard_mock", + "mock_get_experiment_run", + "mock_get_metadata_store", + "get_artifact_not_found_mock", + "update_context_mock", + "aiplatform_autolog_mock", + "mock_autolog_enabled", + "persistent_resource_running_mock", + ) + def test_remote_training_sklearn_with_persistent_cluster_and_experiment_error( + self, + ): + vertexai.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + staging_bucket=_TEST_BUCKET_NAME, + experiment=_TEST_EXPERIMENT, + ) + vertexai.preview.init( + remote=True, autolog=True, cluster=_TEST_PERSISTENT_RESOURCE_CONFIG + ) + + LogisticRegression = vertexai.preview.remote(_logistic.LogisticRegression) + model = LogisticRegression() + + with pytest.raises(ValueError) as e: + model.fit.vertex.remote_config.service_account = "GCE" + model.fit(_X_TRAIN, _Y_TRAIN) + e.match( + regexp=r"Persistent cluster currently does not support custom service account." + ) + + def test_resource_pool_return_spec_dict(self): + test_pool = resource_pool_utils._ResourcePool( + replica_count=_TEST_REPLICA_COUNT, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ) + true_spec_dict = { + "machine_spec": { + "machine_type": _TEST_MACHINE_TYPE, + "accelerator_type": _TEST_ACCELERATOR_TYPE, + "accelerator_count": _TEST_ACCELERATOR_COUNT, + }, + "replica_count": _TEST_REPLICA_COUNT, + "disk_spec": { + "boot_disk_type": "pd-ssd", + "boot_disk_size_gb": 100, + }, + } + + assert test_pool.spec_dict == true_spec_dict diff --git a/vertexai/preview/_workflow/executor/persistent_resource_util.py b/vertexai/preview/_workflow/executor/persistent_resource_util.py index de464a7776..21457125f1 100644 --- a/vertexai/preview/_workflow/executor/persistent_resource_util.py +++ b/vertexai/preview/_workflow/executor/persistent_resource_util.py @@ -15,7 +15,7 @@ import datetime import time -from typing import Optional +from typing import List, Optional from google.api_core import exceptions from google.api_core import gapic_v1 @@ -35,6 +35,7 @@ from google.cloud.aiplatform_v1beta1.types.persistent_resource_service import ( GetPersistentResourceRequest, ) +from vertexai.preview.developer import remote_specs GAPIC_VERSION = aiplatform.__version__ @@ -95,7 +96,6 @@ def check_persistent_resource(cluster_resource_name: str) -> bool: def _default_persistent_resource() -> PersistentResource: """Default persistent resource.""" - # Currently the service accepts only one resource_pool config and image_uri. resource_pools = [] resource_pool = ResourcePool() resource_pool.replica_count = _DEFAULT_REPLICA_COUNT @@ -182,15 +182,32 @@ def _get_persistent_resource(cluster_resource_name: str): time.sleep(sleep_time.total_seconds()) -def create_persistent_resource(cluster_resource_name: str): - """Create a default persistent resource.""" +def create_persistent_resource( + cluster_resource_name: str, + resource_pools: Optional[List[remote_specs.ResourcePool]] = None, +): + """Create a persistent resource.""" locataion = cluster_resource_name.split("/")[3] parent = "/".join(cluster_resource_name.split("/")[:4]) cluster_name = cluster_resource_name.split("/")[-1] client = _create_persistent_resource_client(locataion) - - persistent_resource = _default_persistent_resource() + if resource_pools is None: + persistent_resource = _default_persistent_resource() + else: + # convert remote_specs.ResourcePool to GAPIC ResourcePool + pools = [] + for resource_pool in resource_pools: + pool = ResourcePool() + pool.replica_count = resource_pool.replica_count + pool.machine_spec.machine_type = resource_pool.machine_type + pool.machine_spec.accelerator_type = resource_pool.accelerator_type + pool.machine_spec.accelerator_count = resource_pool.accelerator_count + pool.disk_spec.boot_disk_type = resource_pool.boot_disk_type + pool.disk_spec.boot_disk_size_gb = resource_pool.boot_disk_size_gb + pools.append(pool) + + persistent_resource = PersistentResource(resource_pools=pools) request = persistent_resource_service.CreatePersistentResourceRequest( parent=parent, @@ -205,4 +222,4 @@ def create_persistent_resource(cluster_resource_name: str): # Check cluster creation progress response = _get_persistent_resource(cluster_resource_name) - _LOGGER.info(response) + _LOGGER.info(f"Cluster {response.display_name} was created successfully.") diff --git a/vertexai/preview/_workflow/executor/training.py b/vertexai/preview/_workflow/executor/training.py index d484a5d954..0f248da5e3 100644 --- a/vertexai/preview/_workflow/executor/training.py +++ b/vertexai/preview/_workflow/executor/training.py @@ -28,7 +28,7 @@ from google.cloud import aiplatform import vertexai from google.cloud.aiplatform import base -from google.cloud.aiplatform import jobs +from google.cloud.aiplatform.preview import jobs from google.cloud.aiplatform import utils from google.cloud.aiplatform.metadata import metadata from google.cloud.aiplatform.utils import resource_manager_utils @@ -291,6 +291,10 @@ def _get_service_account( config.service_account or vertexai.preview.global_config.service_account ) if service_account: + if vertexai.preview.global_config.cluster_name: + raise ValueError( + "Persistent cluster currently does not support custom service account." + ) if service_account.lower() == "gce": project = vertexai.preview.global_config.project project_number = resource_manager_utils.get_project_number(project) @@ -701,7 +705,7 @@ def remote_training(invokable: shared._Invokable, rewrapper: Any): # disable CustomJob logs logging.getLogger("google.cloud.aiplatform.jobs").disabled = True try: - job = aiplatform.CustomJob( + job = jobs.CustomJob( display_name=display_name, project=vertexai.preview.global_config.project, location=vertexai.preview.global_config.location, @@ -709,6 +713,7 @@ def remote_training(invokable: shared._Invokable, rewrapper: Any): base_output_dir=remote_job_base_path, staging_bucket=remote_job_base_path, labels=labels, + persistent_resource_id=vertexai.preview.global_config.cluster_name, ) job.submit( diff --git a/vertexai/preview/_workflow/shared/configs.py b/vertexai/preview/_workflow/shared/configs.py index 07153508f4..eb4f478463 100644 --- a/vertexai/preview/_workflow/shared/configs.py +++ b/vertexai/preview/_workflow/shared/configs.py @@ -296,6 +296,11 @@ class PersistentResourceConfig: The cluster name of the remote job. This value may be up to 63 characters, and valid characters are `[a-z0-9_-]`. The first character cannot be a number or hyphen. + resource_pool_specs (vertexai.preview.developer.remote_specs.ResourcePoolSpecs): + The worker pool specs configuration for a remote job. """ name: Optional[str] = None + resource_pools: Optional[ + "vertexai.preview.developer.remote_specs.ResourcePool" # noqa: F821 + ] = None diff --git a/vertexai/preview/developer/remote_specs.py b/vertexai/preview/developer/remote_specs.py index eb27e38bb4..af4b4bedaf 100644 --- a/vertexai/preview/developer/remote_specs.py +++ b/vertexai/preview/developer/remote_specs.py @@ -29,6 +29,7 @@ from google.cloud.aiplatform import base from google.cloud.aiplatform.utils import gcs_utils from google.cloud.aiplatform.utils import worker_spec_utils +from google.cloud.aiplatform.preview import resource_pool_utils from vertexai.preview._workflow.serialization_engine import ( serializers, ) @@ -860,3 +861,28 @@ def my_train_method(self, ...): f"Initialized process rank: {rank}, world_size: {world_size}, device: {device}", ) return model + + +# pylint: disable=protected-access +class ResourcePool(resource_pool_utils._ResourcePool): + """Wraps class that holds a worker pool spec configuration. + + Attributes: + replica_count (int): + The number of worker replicas. + machine_type (str): + The type of machine to use for remote training. + accelerator_count (int): + The number of accelerators to attach to a worker replica. + accelerator_type (str): + Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, + NVIDIA_TESLA_A100, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, + NVIDIA_TESLA_K80, NVIDIA_TESLA_T4, NVIDIA_TESLA_P4 + boot_disk_type (str): + Type of the boot disk (default is `pd-ssd`). + Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or + `pd-standard` (Persistent Disk Hard Disk Drive). + boot_disk_size_gb (int): + Size in GB of the boot disk (default is 100GB). + boot disk size must be within the range of [100, 64000]. + """ diff --git a/vertexai/preview/initializer.py b/vertexai/preview/initializer.py index 88bb1aba21..5023e36455 100644 --- a/vertexai/preview/initializer.py +++ b/vertexai/preview/initializer.py @@ -20,6 +20,7 @@ from vertexai.preview._workflow.executor import ( persistent_resource_util, ) +from vertexai.preview._workflow.shared import configs _LOGGER = base.Logger(__name__) @@ -37,6 +38,7 @@ def init( *, remote: Optional[bool] = None, autolog: Optional[bool] = None, + cluster: Optional[configs.PersistentResourceConfig] = None, ): """Updates preview global parameters for Vertex remote execution. @@ -49,6 +51,24 @@ def init( Optional. Whether or not to turn on autologging feature for remote execution. To learn more about the autologging feature, see https://cloud.google.com/vertex-ai/docs/experiments/autolog-data. + cluster (PersistentResourceConfig): + Optional. If passed, check if the cluster exists. If not, create + a default one (single node, "n1-standard-4", no GPU) with the + given name. Then use the cluster to run CustomJobs. Default is + None. Example usage: + from vertexai.preview.shared.configs import PersistentResourceConfig + cluster = PersistentResourceConfig( + name="my-cluster-1", + resource_pools=[ + ResourcePool(replica_count=1,), + ResourcePool( + machine_type="n1-standard-8", + replica_count=2, + accelerator_type="NVIDIA_TESLA_P100", + accelerator_count=1, + ), + ] + ) """ if remote is not None: self._remote = remote @@ -58,8 +78,11 @@ def init( elif autolog is False: aiplatform.autolog(disable=True) - cluster = None if cluster is not None: + if aiplatform.initializer.global_config.service_account is not None: + raise ValueError( + "Persistent cluster currently does not support custom service account" + ) self._cluster_name = cluster.name cluster_resource_name = f"projects/{self.project}/locations/{self.location}/persistentResources/{self._cluster_name}" cluster_exists = persistent_resource_util.check_persistent_resource( @@ -68,9 +91,10 @@ def init( if cluster_exists: _LOGGER.info(f"Using existing cluster: {cluster_resource_name}") return - # create a default one + # create a cluster persistent_resource_util.create_persistent_resource( - cluster_resource_name=cluster_resource_name + cluster_resource_name=cluster_resource_name, + resource_pools=cluster.resource_pools, ) @property