Skip to content

Commit 0e934f9

Browse files
authored
Migrate to kfp 2 (#170)
* Vertex AI pipelines client: list pipelines. * Very initial version of run once. * Pass parameters to pipeline job. * Add type annotation to task passed to _configure_resources. * Remove image pull policy parameter as it's not supported. * Mlflow component. * Add mlflow component and pass run id to downstream components. * Rename ops to tasks. * Wait for completion. * Remove io.py file as it's not used anyway. * Pre-commit fixes. * Test if resources are correctly added to the pipeline spec. * Test that resources section is not added to spec if it wasn't specified. * Test grouping nodes. * Test adds mlflow task. * Test runner and runner config in args. * Finish other tests. * Add unittest.skip to test_should_remove_old_schedule for now. * List pipelines test. * Test compile. * Patch mlflow is enabled when setting mlflow tags. * Update test_run_once_with_wait test. * Update changelog. * Remove image_pull_policy from the config and add the description of this change to changelog. * Remove 'layer' parameter from datasets as it's deprecated. * Changelog: add description of the --timeout-seconds flag removal. * Soften kfp version requirement, modify one import so that it works using multiple kfp versions. * Remove commented-out line. * Remove passing image_pull_policy to compile. * Update python version in cicd.
1 parent 56fff64 commit 0e934f9

File tree

21 files changed

+581
-1024
lines changed

21 files changed

+581
-1024
lines changed

.github/workflows/prepare-release.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
runs-on: ubuntu-latest
1414
strategy:
1515
matrix:
16-
python-version: [3.8]
16+
python-version: [3.10]
1717
env:
1818
PYTHON_PACKAGE: kedro_vertexai
1919
steps:

.github/workflows/test_and_publish.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ jobs:
105105
strategy:
106106
matrix:
107107
e2e_case: ["standard", "grouping"]
108-
python-version: ['3.8'] # todo update python
108+
python-version: ['3.10']
109109
env:
110110
PROJECT_NAME: kedro-vertexai
111111
IMAGE_REGISTRY: gcr.io/gid-ml-ops-sandbox
@@ -191,7 +191,7 @@ jobs:
191191
runs-on: ubuntu-latest
192192
strategy:
193193
matrix:
194-
python-version: ['3.8']
194+
python-version: ['3.10']
195195
env:
196196
PYTHON_PACKAGE: kedro_vertexai
197197
steps:

CHANGELOG.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# Changelog
22

3-
## [Unreleased]
3+
## [Unreleased] 2024-07-23
4+
5+
- Migrated to kfp 2
6+
- Removed `image_pull_policy` parameter from configuration, as it only applies to Kubernetes backend and not Vertex AI,
7+
and it's only available in `kfp-kubernetes` extension package
8+
- Removed `--timeout-seconds` parameter from `run-once` command for now, as in the old version of the plugin exceeding the specified time
9+
didn't alter the remote pipeline execution, and only escaped the local Python processs. The timeout funcionality will be added later on,
10+
with the proper remote pipeline execution handling, and possibly per-task timeout enabled by [the new kfp feature](https://github.com/kubeflow/pipelines/pull/10481).
411

512
## [0.11.1] - 2024-07-01
613

docs/source/02_installation/02_configuration.md

-4
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ run_config:
1010
# Name of the image to run as the pipeline steps
1111
image: eu.gcr.io/my-gcp-mlops-project/example_model:${oc.env:KEDRO_CONFIG_COMMIT_ID}
1212

13-
# Pull policy to be used for the steps. Use Always if you push the images
14-
# on the same tag, or Never if you use only local images
15-
image_pull_policy: IfNotPresent
16-
1713
# Location of Vertex AI GCS root
1814
root: bucket_name/gcs_suffix
1915

docs/source/03_getting_started/01_quickstart.md

-3
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,14 @@ Adjusted `catalog.yml` should look like this (note: remove the rest of the entri
108108
companies:
109109
type: pandas.CSVDataSet
110110
filepath: data/01_raw/companies.csv
111-
layer: raw
112111

113112
reviews:
114113
type: pandas.CSVDataSet
115114
filepath: data/01_raw/reviews.csv
116-
layer: raw
117115

118116
shuttles:
119117
type: pandas.ExcelDataSet
120118
filepath: data/01_raw/shuttles.xlsx
121-
layer: raw
122119
```
123120
124121
All intermediate and output data will be stored in the location with the following pattern:

kedro_vertexai/cli.py

+3-29
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88

99
from .client import VertexAIPipelinesClient
1010
from .config import PluginConfig, RunConfig
11-
from .constants import KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME, VERTEXAI_RUN_ID_TAG
11+
from .constants import VERTEXAI_RUN_ID_TAG
1212
from .context_helper import ContextHelper
13-
from .data_models import PipelineResult
1413
from .utils import (
1514
docker_build,
1615
docker_push,
@@ -98,14 +97,6 @@ def list_pipelines(ctx):
9897
help="Parameters override in form of `key=value`",
9998
)
10099
@click.option("--wait-for-completion", type=bool, is_flag=True, default=False)
101-
@click.option(
102-
"--timeout-seconds",
103-
type=int,
104-
default=1800,
105-
help="If --wait-for-completion is used, "
106-
"this option sets timeout after which the plugin will return non-zero exit code "
107-
"if the pipeline does not finish in time",
108-
)
109100
@click.pass_context
110101
def run_once(
111102
ctx: Context,
@@ -115,7 +106,6 @@ def run_once(
115106
pipeline: str,
116107
params: list,
117108
wait_for_completion: bool,
118-
timeout_seconds: int,
119109
):
120110
"""Deploy pipeline as a single run within given experiment
121111
Config can be specified in kubeflow.yml as well."""
@@ -143,29 +133,14 @@ def run_once(
143133
Consider using '--auto-build' parameter."
144134
)
145135

146-
run = client.run_once(
136+
job = client.run_once(
147137
pipeline=pipeline,
148138
image=image,
149-
image_pull_policy=config.image_pull_policy,
150139
parameters=format_params(params),
151140
)
152141

153-
click.echo(
154-
f"Intermediate data datasets will be stored in {os.linesep}"
155-
f"gs://{config.root.strip('/')}/{KEDRO_VERTEXAI_BLOB_TEMP_DIR_NAME}/{run['displayName']}/*.bin"
156-
)
157-
158142
if wait_for_completion:
159-
result: PipelineResult = client.wait_for_completion(
160-
timeout_seconds
161-
) # blocking call
162-
if result.is_success:
163-
logger.info("Pipeline finished successfully!")
164-
exit_code = 0
165-
else:
166-
logger.error(f"Pipeline finished with status: {result.state}")
167-
exit_code = 1
168-
ctx.exit(exit_code)
143+
job.wait()
169144

170145

171146
@vertexai_group.command()
@@ -210,7 +185,6 @@ def compile(ctx, image, pipeline, output) -> None:
210185

211186
context_helper.vertexai_client.compile(
212187
pipeline=pipeline,
213-
image_pull_policy=config.image_pull_policy,
214188
image=image if image else config.image,
215189
output=output,
216190
)

kedro_vertexai/client.py

+19-87
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,17 @@
66
import json
77
import logging
88
import os
9-
import threading
10-
from queue import Empty, Queue
119
from tempfile import NamedTemporaryFile
12-
from time import sleep
1310

11+
from google.cloud import aiplatform as aip
12+
from google.cloud.aiplatform import PipelineJob
1413
from google.cloud.scheduler_v1.services.cloud_scheduler import (
1514
CloudSchedulerClient,
1615
)
17-
from kfp.v2 import compiler
18-
from kfp.v2.google.client import AIPlatformClient
16+
from kfp import compiler
1917
from tabulate import tabulate
2018

2119
from .config import PluginConfig
22-
from .data_models import PipelineResult, PipelineStatus
2320
from .generator import PipelineGenerator
2421

2522

@@ -32,9 +29,7 @@ class VertexAIPipelinesClient:
3229

3330
def __init__(self, config: PluginConfig, project_name, context):
3431

35-
self.api_client = AIPlatformClient(
36-
project_id=config.project_id, region=config.region
37-
)
32+
aip.init(project=config.project_id, location=config.region)
3833
self.cloud_scheduler_client = CloudSchedulerClient()
3934
self.location = f"projects/{config.project_id}/locations/{config.region}"
4035
self.run_config = config.run_config
@@ -46,58 +41,50 @@ def list_pipelines(self):
4641
List all the jobs (current and historical) on Vertex AI Pipelines
4742
:return:
4843
"""
49-
list_jobs_response = self.api_client.list_jobs()
50-
self.log.debug(list_jobs_response)
51-
52-
jobs_key = "pipelineJobs"
5344
headers = ["Name", "ID"]
54-
data = (
55-
map(
56-
lambda x: [x.get("displayName"), x["name"]],
57-
list_jobs_response[jobs_key],
58-
)
59-
if jobs_key in list_jobs_response
60-
else []
61-
)
45+
46+
list_jobs_response = aip.PipelineJob.list()
47+
data = [(x.display_name, x.name) for x in list_jobs_response]
6248

6349
return tabulate(data, headers=headers)
6450

6551
def run_once(
6652
self,
6753
pipeline,
6854
image,
69-
image_pull_policy="IfNotPresent",
7055
parameters=None,
71-
):
56+
) -> PipelineJob:
7257
"""
7358
Runs the pipeline in Vertex AI Pipelines
7459
:param pipeline:
7560
:param image:
76-
:param image_pull_policy:
61+
:param parameters:
7762
:return:
7863
"""
7964
with NamedTemporaryFile(
80-
mode="rt", prefix="kedro-vertexai", suffix=".json"
65+
mode="rt", prefix="kedro-vertexai", suffix=".yaml"
8166
) as spec_output:
8267
self.compile(
8368
pipeline,
8469
image,
8570
output=spec_output.name,
86-
image_pull_policy=image_pull_policy,
8771
)
8872

89-
run = self.api_client.create_run_from_job_spec(
90-
service_account=self.run_config.service_account,
91-
job_spec_path=spec_output.name,
73+
job = aip.PipelineJob(
74+
display_name=self.run_name,
75+
template_path=spec_output.name,
9276
job_id=self.run_name,
9377
pipeline_root=f"gs://{self.run_config.root}",
9478
parameter_values=parameters or {},
9579
enable_caching=False,
80+
)
81+
82+
job.submit(
83+
service_account=self.run_config.service_account,
9684
network=self.run_config.network.vpc,
9785
)
98-
self.log.debug("Run created %s", str(run))
9986

100-
return run
87+
return job
10188

10289
def _generate_run_name(self, config: PluginConfig): # noqa
10390
return config.run_config.experiment_name.rstrip("-") + "-{}".format(
@@ -109,20 +96,16 @@ def compile(
10996
pipeline,
11097
image,
11198
output,
112-
image_pull_policy="IfNotPresent",
11399
):
114100
"""
115101
Creates json file in given local output path
116102
:param pipeline:
117103
:param image:
118104
:param output:
119-
:param image_pull_policy:
120105
:return:
121106
"""
122107
token = os.getenv("MLFLOW_TRACKING_TOKEN", "")
123-
pipeline_func = self.generator.generate_pipeline(
124-
pipeline, image, image_pull_policy, token
125-
)
108+
pipeline_func = self.generator.generate_pipeline(pipeline, image, token)
126109
compiler.Compiler().compile(
127110
pipeline_func=pipeline_func,
128111
package_path=output,
@@ -170,7 +153,6 @@ def schedule(
170153
pipeline,
171154
self.run_config.image,
172155
output=spec_output.name,
173-
image_pull_policy=image_pull_policy,
174156
)
175157
self.api_client.create_schedule_from_job_spec(
176158
job_spec_path=spec_output.name,
@@ -182,53 +164,3 @@ def schedule(
182164
)
183165

184166
self.log.info("Pipeline scheduled to %s", cron_expression)
185-
186-
def wait_for_completion(
187-
self,
188-
max_timeout_seconds,
189-
interval_seconds=30.0,
190-
max_api_fails=5,
191-
) -> PipelineResult:
192-
termination_statuses = (
193-
PipelineStatus.PIPELINE_STATE_FAILED,
194-
PipelineStatus.PIPELINE_STATE_SUCCEEDED,
195-
PipelineStatus.PIPELINE_STATE_CANCELLED,
196-
)
197-
198-
status_queue = Queue(1)
199-
200-
def monitor(q: Queue):
201-
fails = 0
202-
while fails < max_api_fails:
203-
try:
204-
job = self.api_client.get_job(self.run_name)
205-
state = job["state"]
206-
if state in termination_statuses:
207-
q.put(
208-
PipelineResult(
209-
is_success=state
210-
== PipelineStatus.PIPELINE_STATE_SUCCEEDED,
211-
state=state,
212-
job_data=job,
213-
)
214-
)
215-
break
216-
else:
217-
self.log.info(f"Pipeline state: {state}")
218-
except: # noqa: E722
219-
fails += 1
220-
self.log.error(
221-
"Exception occurred while checking the pipeline status",
222-
exc_info=True,
223-
)
224-
finally:
225-
sleep(interval_seconds)
226-
else:
227-
q.put(PipelineResult(is_success=False, state="Internal exception"))
228-
229-
thread = threading.Thread(target=monitor, daemon=True, args=(status_queue,))
230-
thread.start()
231-
try:
232-
return status_queue.get(timeout=max_timeout_seconds)
233-
except Empty:
234-
return PipelineResult(False, f"Max timeout {max_timeout_seconds}s reached")

kedro_vertexai/config.py

-5
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
# Name of the image to run as the pipeline steps
1616
image: {image}
1717
18-
# Pull policy to be used for the steps. Use Always if you push the images
19-
# on the same tag, or Never if you use only local images
20-
image_pull_policy: IfNotPresent
21-
2218
# Location of Vertex AI GCS root
2319
root: bucket_name/gcs_suffix
2420
@@ -199,7 +195,6 @@ class MLFlowVertexAIConfig(BaseModel):
199195

200196
class RunConfig(BaseModel):
201197
image: str
202-
image_pull_policy: Optional[str] = "IfNotPresent"
203198
root: Optional[str]
204199
description: Optional[str]
205200
experiment_name: str

kedro_vertexai/data_models.py

-22
This file was deleted.

0 commit comments

Comments
 (0)