Skip to content

Commit

Permalink
feat: Create Vertex Experiment when uploading Tensorboard logs
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 634035336
  • Loading branch information
vertex-sdk-bot authored and Copybara-Service committed May 15, 2024
1 parent b47e6ff commit 339f8b6
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 174 deletions.
11 changes: 9 additions & 2 deletions google/cloud/aiplatform/metadata/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def update(
metadata: Optional[Dict] = None,
description: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
location: Optional[str] = None,
):
"""Updates an existing Metadata Context with new metadata.
Expand All @@ -307,7 +308,10 @@ def update(
for _ in range(_ETAG_ERROR_MAX_RETRY_COUNT - 1):
try:
super().update(
metadata=metadata, description=description, credentials=credentials
metadata=metadata,
description=description,
credentials=credentials,
location=location,
)
return
except Aborted as aborted_exception:
Expand All @@ -322,7 +326,10 @@ def update(

# Expose result/exception directly in the last retry.
super().update(
metadata=metadata, description=description, credentials=credentials
metadata=metadata,
description=description,
credentials=credentials,
location=location,
)

@classmethod
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/aiplatform/metadata/experiment_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,8 @@ def assign_backing_tensorboard(
self._metadata_context.update(
metadata={
constants._BACKING_TENSORBOARD_RESOURCE_KEY: tensorboard.resource_name
}
},
location=self._metadata_context.location,
)

def _log_experiment_loggable(self, experiment_loggable: "_ExperimentLoggable"):
Expand Down
25 changes: 21 additions & 4 deletions google/cloud/aiplatform/metadata/experiment_run_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,9 @@ def create(
The newly created experiment run.
"""

experiment = cls._get_experiment(experiment)
experiment = cls._get_experiment(
experiment, project=project, location=location, credentials=credentials
)

run_id = _format_experiment_run_resource_id(
experiment_name=experiment.name, run_name=run_name
Expand Down Expand Up @@ -760,7 +762,10 @@ def _create_context():
try:
if tensorboard:
cls._assign_backing_tensorboard(
self=experiment_run, tensorboard=tensorboard
self=experiment_run,
tensorboard=tensorboard,
project=project,
location=location,
)
else:
cls._assign_to_experiment_backing_tensorboard(self=experiment_run)
Expand Down Expand Up @@ -792,7 +797,10 @@ def _format_tensorboard_experiment_display_name(experiment_name: str) -> str:
return f"{experiment_name} Backing Tensorboard Experiment"

def _assign_backing_tensorboard(
self, tensorboard: Union[tensorboard_resource.Tensorboard, str]
self,
tensorboard: Union[tensorboard_resource.Tensorboard, str],
project: Optional[str] = None,
location: Optional[str] = None,
):
"""Assign tensorboard as the backing tensorboard to this run.
Expand All @@ -802,7 +810,10 @@ def _assign_backing_tensorboard(
"""
if isinstance(tensorboard, str):
tensorboard = tensorboard_resource.Tensorboard(
tensorboard, credentials=self._metadata_node.credentials
tensorboard,
project=project,
location=location,
credentials=self._metadata_node.credentials,
)

tensorboard_resource_name_parts = tensorboard._parse_resource_name(
Expand All @@ -827,6 +838,8 @@ def _assign_backing_tensorboard(
self._experiment.name
),
tensorboard_name=tensorboard.resource_name,
project=project,
location=location,
credentials=tensorboard.credentials,
labels=constants._VERTEX_EXPERIMENT_TB_EXPERIMENT_LABEL,
)
Expand All @@ -849,6 +862,8 @@ def _assign_backing_tensorboard(
tensorboard_run = tensorboard_resource.TensorboardRun.create(
tensorboard_run_id=self._run_name,
tensorboard_experiment_name=tensorboard_experiment.resource_name,
project=project,
location=location,
credentials=tensorboard.credentials,
)

Expand All @@ -865,6 +880,8 @@ def _assign_backing_tensorboard(
schema_title=constants._TENSORBOARD_RUN_REFERENCE_ARTIFACT.schema_title,
schema_version=constants._TENSORBOARD_RUN_REFERENCE_ARTIFACT.schema_version,
state=gca_artifact.Artifact.State.LIVE,
project=project,
location=location,
)

self._metadata_node.add_artifacts_and_executions(
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/aiplatform/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ def set_experiment(
backing_tensorboard: Optional[
Union[str, tensorboard_resource.Tensorboard, bool]
] = None,
project: Optional[str] = None,
location: Optional[str] = None,
):
"""Set the experiment. Will retrieve the Experiment if it exists or create one with the provided name.
Expand All @@ -309,11 +311,20 @@ def set_experiment(
To disable using a backing tensorboard, set `backing_tensorboard` to `False`.
To maintain this behavior, set `experiment_tensorboard` to `False` in subsequent calls to aiplatform.init().
project (str):
Optional. Project where this experiment will be retrieved from or created. Overrides project set in
aiplatform.init.
location (str):
Optional. Location where this experiment will be retrieved from or created. Overrides location set in
aiplatform.init.
"""
self.reset()

experiment = experiment_resources.Experiment.get_or_create(
experiment_name=experiment, description=description
experiment_name=experiment,
description=description,
project=project,
location=location,
)

if backing_tensorboard and not isinstance(backing_tensorboard, bool):
Expand Down
5 changes: 4 additions & 1 deletion google/cloud/aiplatform/metadata/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def update(
metadata: Optional[Dict] = None,
description: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
location: Optional[str] = None,
):
"""Updates an existing Metadata resource with new metadata.
Expand All @@ -309,7 +310,9 @@ def update(
if description:
gca_resource.description = description

api_client = self._instantiate_client(credentials=credentials)
api_client = self._instantiate_client(
credentials=credentials, location=location
)
# TODO: if etag is not valid sync and retry
update_gca_resource = self._update_resource(
client=api_client,
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/aiplatform/tensorboard/logdir_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ def synchronize_runs(self):
In addition, any existing `DirectoryLoader` whose run directory
no longer exists will be deleted.
Modify run name to work with Experiments restrictions.
"""
logger.info("Starting logdir traversal of %s", self._logdir)
runs_seen = set()
for subdir in io_wrapper.GetLogdirSubdirectories(self._logdir):
run = os.path.relpath(subdir, self._logdir)
run = run.replace("/", "-").replace("_", "-")
runs_seen.add(run)
if run not in self._directory_loaders:
logger.info("- Adding run for relative directory %s", run)
Expand Down
67 changes: 32 additions & 35 deletions google/cloud/aiplatform/tensorboard/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@
from collections import defaultdict
import functools
import logging
import os
import re
import time
from typing import ContextManager, Dict, FrozenSet, Generator, Iterable, Optional, Tuple
import uuid

from google.api_core import exceptions
from google.cloud import storage
from google.cloud.aiplatform import base
from google.cloud.aiplatform.compat.services import (
tensorboard_service_client,
)
from google.cloud.aiplatform.compat.types import tensorboard_data
from google.cloud.aiplatform.compat.types import tensorboard_experiment
from google.cloud.aiplatform.compat.types import tensorboard_service
from google.cloud.aiplatform.compat.types import tensorboard_time_series
from google.cloud.aiplatform.metadata import experiment_resources
from google.cloud.aiplatform.metadata import metadata
from google.cloud.aiplatform.tensorboard import logdir_loader
from google.cloud.aiplatform.tensorboard import upload_tracker
from google.cloud.aiplatform.tensorboard import uploader_constants
Expand Down Expand Up @@ -215,47 +214,45 @@ def active_filter(secs):

self._create_additional_senders()

def _create_or_get_experiment(self) -> tensorboard_experiment.TensorboardExperiment:
"""Create an experiment or get an experiment.
Attempts to create an experiment. If the experiment already exists and
creation fails then the experiment will be retrieved.
def create_experiment(self):
"""Creates an Experiment for this upload session.
Returns:
The created or retrieved experiment.
Sets the tensorboard resource and experiment, which will get or create a
Vertex Experiment and associate it with a Tensorboard Experiment.
"""
logger.info("Creating experiment")
m = self._api.parse_tensorboard_path(self._tensorboard_resource_name)

tb_experiment = tensorboard_experiment.TensorboardExperiment(
description=self._description, display_name=self._experiment_display_name
existing_experiment = experiment_resources.Experiment.get(
experiment_name=self._experiment_name,
project=m["project"],
location=m["location"],
)

try:
experiment = self._api.create_tensorboard_experiment(
parent=self._tensorboard_resource_name,
tensorboard_experiment=tb_experiment,
tensorboard_experiment_id=self._experiment_name,
)
if not existing_experiment:
self._is_brand_new_experiment = True
except exceptions.AlreadyExists:
logger.info("Creating experiment failed. Retrieving experiment.")
experiment_name = os.path.join(
self._tensorboard_resource_name, "experiments", self._experiment_name
)
experiment = self._api.get_tensorboard_experiment(name=experiment_name)
return experiment

def create_experiment(self):
"""Creates an Experiment for this upload session and returns the ID."""
metadata._experiment_tracker.reset()
metadata._experiment_tracker.set_tensorboard(
tensorboard=self._tensorboard_resource_name,
project=m["project"],
location=m["location"],
)
metadata._experiment_tracker.set_experiment(
project=m["project"],
location=m["location"],
experiment=self._experiment_name,
description=self._description,
backing_tensorboard=self._tensorboard_resource_name,
)

experiment = self._create_or_get_experiment()
self._experiment = experiment
self._tensorboard_experiment_resource_name = (
f"{self._tensorboard_resource_name}/experiments/{self._experiment_name}"
)
self._one_platform_resource_manager = uploader_utils.OnePlatformResourceManager(
self._experiment.name, self._api
self._tensorboard_experiment_resource_name, self._api
)

self._request_sender = _BatchedRequestSender(
self._experiment.name,
self._tensorboard_experiment_resource_name,
self._api,
allowed_plugins=self._allowed_plugins,
upload_limits=self._upload_limits,
Expand All @@ -271,7 +268,7 @@ def create_experiment(self):
# Update partials with experiment name
for sender in self._additional_senders.keys():
self._additional_senders[sender] = self._additional_senders[sender](
experiment_resource_name=self._experiment.name,
experiment_resource_name=self._tensorboard_experiment_resource_name,
)

self._dispatcher = _Dispatcher(
Expand Down Expand Up @@ -310,7 +307,7 @@ def _create_additional_senders(self) -> Dict[str, uploader_utils.RequestSender]:
)

def get_experiment_resource_name(self):
return self._experiment.name
return self._tensorboard_experiment_resource_name

def start_uploading(self):
"""Blocks forever to continuously upload data from the logdir.
Expand Down
Loading

0 comments on commit 339f8b6

Please sign in to comment.