Skip to content

Commit

Permalink
feat: add tensorboard support to custom job and hyperparameter tuning…
Browse files Browse the repository at this point in the history
… job (#404)
  • Loading branch information
sasha-gitg authored May 18, 2021
1 parent aab9e58 commit fa9bc39
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 10 deletions.
60 changes: 58 additions & 2 deletions google/cloud/aiplatform/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
batch_prediction_job_v1 as gca_bp_job_v1,
batch_prediction_job_v1beta1 as gca_bp_job_v1beta1,
custom_job as gca_custom_job_compat,
custom_job_v1beta1 as gca_custom_job_v1beta1,
explanation_v1beta1 as gca_explanation_v1beta1,
io as gca_io_compat,
io_v1beta1 as gca_io_v1beta1,
job_state as gca_job_state,
hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat,
hyperparameter_tuning_job_v1beta1 as gca_hyperparameter_tuning_job_v1beta1,
machine_resources as gca_machine_resources_compat,
machine_resources_v1beta1 as gca_machine_resources_v1beta1,
study as gca_study_compat,
Expand Down Expand Up @@ -1132,6 +1134,7 @@ def run(
network: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
tensorboard: Optional[str] = None,
sync: bool = True,
) -> None:
"""Run this configured CustomJob.
Expand All @@ -1152,6 +1155,21 @@ def run(
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
tensorboard (str):
Optional. The name of an AI Platform
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following AI Platform environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will unblock and it will be executed in a concurrent Future.
Expand All @@ -1170,9 +1188,18 @@ def run(
restart_job_on_worker_restart=restart_job_on_worker_restart,
)

if tensorboard:
v1beta1_gca_resource = gca_custom_job_v1beta1.CustomJob()
v1beta1_gca_resource._pb.MergeFromString(
self._gca_resource._pb.SerializeToString()
)
self._gca_resource = v1beta1_gca_resource
self._gca_resource.job_spec.tensorboard = tensorboard

_LOGGER.log_create_with_lro(self.__class__)

self._gca_resource = self.api_client.create_custom_job(
version = "v1beta1" if tensorboard else "v1"
self._gca_resource = self.api_client.select_version(version).create_custom_job(
parent=self._parent, custom_job=self._gca_resource
)

Expand Down Expand Up @@ -1415,6 +1442,7 @@ def run(
network: Optional[str] = None,
timeout: Optional[int] = None, # seconds
restart_job_on_worker_restart: bool = False,
tensorboard: Optional[str] = None,
sync: bool = True,
) -> None:
"""Run this configured CustomJob.
Expand All @@ -1435,6 +1463,21 @@ def run(
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
tensorboard (str):
Optional. The name of an AI Platform
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following AI Platform environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will unblock and it will be executed in a concurrent Future.
Expand All @@ -1453,9 +1496,22 @@ def run(
restart_job_on_worker_restart=restart_job_on_worker_restart,
)

if tensorboard:
v1beta1_gca_resource = (
gca_hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob()
)
v1beta1_gca_resource._pb.MergeFromString(
self._gca_resource._pb.SerializeToString()
)
self._gca_resource = v1beta1_gca_resource
self._gca_resource.trial_job_spec.tensorboard = tensorboard

_LOGGER.log_create_with_lro(self.__class__)

self._gca_resource = self.api_client.create_hyperparameter_tuning_job(
version = "v1beta1" if tensorboard else "v1"
self._gca_resource = self.api_client.select_version(
version
).create_hyperparameter_tuning_job(
parent=self._parent, hyperparameter_tuning_job=self._gca_resource
)

Expand Down
71 changes: 70 additions & 1 deletion tests/unit/aiplatform/test_custom_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.types import custom_job as gca_custom_job_compat
from google.cloud.aiplatform.compat.types import (
custom_job_v1beta1 as gca_custom_job_v1beta1,
)
from google.cloud.aiplatform.compat.types import io as gca_io_compat
from google.cloud.aiplatform.compat.types import job_state as gca_job_state_compat
from google.cloud.aiplatform.compat.types import (
encryption_spec as gca_encryption_spec_compat,
)
from google.cloud.aiplatform_v1.services.job_service import client as job_service_client
from google.cloud.aiplatform_v1beta1.services.job_service import (
client as job_service_client_v1beta1,
)

_TEST_PROJECT = "test-project"
_TEST_LOCATION = "us-central1"
Expand All @@ -44,6 +50,7 @@
_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}"

_TEST_CUSTOM_JOB_NAME = f"{_TEST_PARENT}/customJobs/{_TEST_ID}"
_TEST_TENSORBOARD_NAME = f"{_TEST_PARENT}/tensorboards/{_TEST_ID}"

_TEST_TRAINING_CONTAINER_IMAGE = "gcr.io/test-training/container:image"

Expand Down Expand Up @@ -97,11 +104,20 @@
)


def _get_custom_job_proto(state=None, name=None, error=None):
def _get_custom_job_proto(state=None, name=None, error=None, version="v1"):
custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO)
custom_job_proto.name = name
custom_job_proto.state = state
custom_job_proto.error = error

if version == "v1beta1":
v1beta1_custom_job_proto = gca_custom_job_v1beta1.CustomJob()
v1beta1_custom_job_proto._pb.MergeFromString(
custom_job_proto._pb.SerializeToString()
)
custom_job_proto = v1beta1_custom_job_proto
custom_job_proto.job_spec.tensorboard = _TEST_TENSORBOARD_NAME

return custom_job_proto


Expand Down Expand Up @@ -162,6 +178,19 @@ def create_custom_job_mock():
yield create_custom_job_mock


@pytest.fixture
def create_custom_job_v1beta1_mock():
with mock.patch.object(
job_service_client_v1beta1.JobServiceClient, "create_custom_job"
) as create_custom_job_mock:
create_custom_job_mock.return_value = _get_custom_job_proto(
name=_TEST_CUSTOM_JOB_NAME,
state=gca_job_state_compat.JobState.JOB_STATE_PENDING,
version="v1beta1",
)
yield create_custom_job_mock


class TestCustomJob:
def setup_method(self):
reload(aiplatform.initializer)
Expand Down Expand Up @@ -321,3 +350,43 @@ def test_create_from_local_script_raises_with_no_staging_bucket(
script_path=test_training_jobs._TEST_LOCAL_SCRIPT_FILE_NAME,
container_uri=_TEST_TRAINING_CONTAINER_IMAGE,
)

@pytest.mark.parametrize("sync", [True, False])
def test_create_custom_job_with_tensorboard(
self, create_custom_job_v1beta1_mock, get_custom_job_mock, sync
):

aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = aiplatform.CustomJob(
display_name=_TEST_DISPLAY_NAME, worker_pool_specs=_TEST_WORKER_POOL_SPEC
)

job.run(
service_account=_TEST_SERVICE_ACCOUNT,
tensorboard=_TEST_TENSORBOARD_NAME,
network=_TEST_NETWORK,
timeout=_TEST_TIMEOUT,
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
sync=sync,
)

job.wait()

expected_custom_job = _get_custom_job_proto(version="v1beta1")

create_custom_job_v1beta1_mock.assert_called_once_with(
parent=_TEST_PARENT, custom_job=expected_custom_job
)

expected_custom_job = _get_custom_job_proto()

assert job.job_spec == expected_custom_job.job_spec
assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)
113 changes: 106 additions & 7 deletions tests/unit/aiplatform/test_hyperparameter_tuning_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
)
from google.cloud.aiplatform.compat.types import (
hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat,
hyperparameter_tuning_job_v1beta1 as gca_hyperparameter_tuning_job_v1beta1,
)
from google.cloud.aiplatform.compat.types import study as gca_study_compat
from google.cloud.aiplatform_v1.services.job_service import client as job_service_client
from google.cloud.aiplatform_v1beta1.services.job_service import (
client as job_service_client_v1beta1,
)

import test_custom_job

Expand Down Expand Up @@ -122,12 +126,29 @@
)


def _get_hyperparameter_tuning_job_proto(state=None, name=None, error=None):
custom_job_proto = copy.deepcopy(_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO)
custom_job_proto.name = name
custom_job_proto.state = state
custom_job_proto.error = error
return custom_job_proto
def _get_hyperparameter_tuning_job_proto(
state=None, name=None, error=None, version="v1"
):
hyperparameter_tuning_job_proto = copy.deepcopy(
_TEST_BASE_HYPERPARAMETER_TUNING_JOB_PROTO
)
hyperparameter_tuning_job_proto.name = name
hyperparameter_tuning_job_proto.state = state
hyperparameter_tuning_job_proto.error = error

if version == "v1beta1":
v1beta1_hyperparameter_tuning_job_proto = (
gca_hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob()
)
v1beta1_hyperparameter_tuning_job_proto._pb.MergeFromString(
hyperparameter_tuning_job_proto._pb.SerializeToString()
)
hyperparameter_tuning_job_proto = v1beta1_hyperparameter_tuning_job_proto
hyperparameter_tuning_job_proto.trial_job_spec.tensorboard = (
test_custom_job._TEST_TENSORBOARD_NAME
)

return hyperparameter_tuning_job_proto


@pytest.fixture
Expand Down Expand Up @@ -187,7 +208,20 @@ def create_hyperparameter_tuning_job_mock():
yield create_hyperparameter_tuning_job_mock


class TestCustomJob:
@pytest.fixture
def create_hyperparameter_tuning_job_v1beta1_mock():
with mock.patch.object(
job_service_client_v1beta1.JobServiceClient, "create_hyperparameter_tuning_job"
) as create_hyperparameter_tuning_job_mock:
create_hyperparameter_tuning_job_mock.return_value = _get_hyperparameter_tuning_job_proto(
name=_TEST_HYPERPARAMETERTUNING_JOB_NAME,
state=gca_job_state_compat.JobState.JOB_STATE_PENDING,
version="v1beta1",
)
yield create_hyperparameter_tuning_job_mock


class TestHyperparameterTuningJob:
def setup_method(self):
reload(aiplatform.initializer)
reload(aiplatform)
Expand Down Expand Up @@ -366,3 +400,68 @@ def test_get_hyperparameter_tuning_job(self, get_hyperparameter_tuning_job_mock)
assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_PENDING
)

@pytest.mark.parametrize("sync", [True, False])
def test_create_hyperparameter_tuning_job_with_tensorboard(
self,
create_hyperparameter_tuning_job_v1beta1_mock,
get_hyperparameter_tuning_job_mock,
sync,
):

aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
staging_bucket=_TEST_STAGING_BUCKET,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

custom_job = aiplatform.CustomJob(
display_name=test_custom_job._TEST_DISPLAY_NAME,
worker_pool_specs=test_custom_job._TEST_WORKER_POOL_SPEC,
)

job = aiplatform.HyperparameterTuningJob(
display_name=_TEST_DISPLAY_NAME,
custom_job=custom_job,
metric_spec={_TEST_METRIC_SPEC_KEY: _TEST_METRIC_SPEC_VALUE},
parameter_spec={
"lr": hpt.DoubleParameterSpec(min=0.001, max=0.1, scale="log"),
"units": hpt.IntegerParameterSpec(min=4, max=1028, scale="linear"),
"activation": hpt.CategoricalParameterSpec(
values=["relu", "sigmoid", "elu", "selu", "tanh"]
),
"batch_size": hpt.DiscreteParameterSpec(
values=[16, 32], scale="linear"
),
},
parallel_trial_count=_TEST_PARALLEL_TRIAL_COUNT,
max_trial_count=_TEST_MAX_TRIAL_COUNT,
max_failed_trial_count=_TEST_MAX_FAILED_TRIAL_COUNT,
search_algorithm=_TEST_SEARCH_ALGORITHM,
measurement_selection=_TEST_MEASUREMENT_SELECTION,
)

job.run(
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
timeout=_TEST_TIMEOUT,
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
tensorboard=test_custom_job._TEST_TENSORBOARD_NAME,
sync=sync,
)

job.wait()

expected_hyperparameter_tuning_job = _get_hyperparameter_tuning_job_proto(
version="v1beta1"
)

create_hyperparameter_tuning_job_v1beta1_mock.assert_called_once_with(
parent=_TEST_PARENT,
hyperparameter_tuning_job=expected_hyperparameter_tuning_job,
)

assert (
job._gca_resource.state == gca_job_state_compat.JobState.JOB_STATE_SUCCEEDED
)

0 comments on commit fa9bc39

Please sign in to comment.