Skip to content

Commit

Permalink
adapt tests to standalone KFP
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaquinRivesGambin committed Apr 19, 2024
1 parent 6a47f97 commit 5392e47
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 41 deletions.
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dotenv import load_dotenv
import os

from .utils import parse_bool

ENV_FILE = pathlib.Path(__file__).parent.parent / ".platform/.config"
assert ENV_FILE.exists(), f"File not found: {ENV_FILE} (autogenerated by the platform on installation)" # noqa
load_dotenv(dotenv_path=ENV_FILE)
Expand All @@ -26,6 +28,9 @@
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
assert AWS_SECRET_ACCESS_KEY is not None

IS_STANDALONE_KFP = "kfp" in os.environ.get('DEPLOYMENT_OPTION')
SKIP_LOCAL_REGISTRY = not parse_bool(os.environ.get('INSTALL_LOCAL_REGISTRY'))


def pytest_sessionstart(session):
"""
Expand Down
80 changes: 55 additions & 25 deletions tests/test_kfp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import subprocess
import logging
import pathlib
Expand All @@ -9,7 +10,7 @@
import requests
from urllib.parse import urlsplit

from .conftest import CLUSTER_NAME
from .conftest import CLUSTER_NAME, IS_STANDALONE_KFP

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -22,7 +23,7 @@
KUBEFLOW_ENDPOINT = "http://localhost:8080"
KUBEFLOW_USERNAME = "[email protected]"
KUBEFLOW_PASSWORD = "12341234"
NAMESPACE = "kubeflow-user-example-com"
KUBEFLOW_USER_NAMESPACE = "kubeflow-user-example-com"


def get_istio_auth_session(url: str, username: str, password: str) -> dict:
Expand All @@ -44,10 +45,8 @@ def get_istio_auth_session(url: str, username: str, password: str) -> dict:
"is_secured": None, # True if KF endpoint is secured
"session_cookie": None # Resulting session cookies in the form "key1=value1; key2=value2"
}

# use a persistent session (for cookies)
with requests.Session() as s:

################
# Determine if Endpoint is Secured
################
Expand All @@ -56,7 +55,6 @@ def get_istio_auth_session(url: str, username: str, password: str) -> dict:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {url}"
)

auth_session["redirect_url"] = resp.url

# if we were NOT redirected, then the endpoint is UNSECURED
Expand Down Expand Up @@ -101,7 +99,6 @@ def get_istio_auth_session(url: str, username: str, password: str) -> dict:
f"HTTP status code '{resp.status_code}' "
f"for GET against: {redirect_url_obj.geturl()}"
)

# set the login url
auth_session["dex_login_url"] = resp.url

Expand All @@ -118,7 +115,6 @@ def get_istio_auth_session(url: str, username: str, password: str) -> dict:
f"Login credentials were probably invalid - "
f"No redirect after POST to: {auth_session['dex_login_url']}"
)

# store the session cookies in a "key1=value1; key2=value2" string
auth_session["session_cookie"] = "; ".join(
[f"{c.name}={c.value}" for c in s.cookies]
Expand All @@ -128,43 +124,37 @@ def get_istio_auth_session(url: str, username: str, password: str) -> dict:


def run_pipeline(pipeline_file: str, experiment_name: str):

with subprocess.Popen(["kubectl", "-n", "istio-system", "port-forward", "svc/istio-ingressgateway", "8080:80"], stdout=True) as proc:
"""Run a pipeline on a Kubeflow cluster."""
with subprocess.Popen(["kubectl", "-n", "istio-system", "port-forward", "svc/istio-ingressgateway", "8080:80"], stdout=True) as proc: # noqa: E501
try:
time.sleep(2) # give some time to the port-forward connection

auth_session = get_istio_auth_session(
url=KUBEFLOW_ENDPOINT,
username=KUBEFLOW_USERNAME,
password=KUBEFLOW_PASSWORD
)

client = kfp.Client(
host=f"{KUBEFLOW_ENDPOINT}/pipeline",
cookies=auth_session["session_cookie"],
namespace=NAMESPACE,
namespace=KUBEFLOW_USER_NAMESPACE,
)

created_run = client.create_run_from_pipeline_package(
pipeline_file=pipeline_file,
enable_caching=False,
arguments={},
run_name="kfp_test_run",
experiment_name=experiment_name,
namespace=NAMESPACE
namespace=KUBEFLOW_USER_NAMESPACE
)

run_id = created_run.run_id

logger.info(f"Submitted run with ID: {run_id}")

logger.info(f"Waiting for run {run_id} to complete....")
run_detail = created_run.wait_for_run_completion()
_handle_job_end(run_detail)

# clean up
experiment = client.get_experiment(
experiment_name=experiment_name, namespace=NAMESPACE
experiment_name=experiment_name, namespace=KUBEFLOW_USER_NAMESPACE
)
client.delete_experiment(experiment.id)
logger.info("Done")
Expand All @@ -176,16 +166,46 @@ def run_pipeline(pipeline_file: str, experiment_name: str):
proc.terminate()


def run_pipeline_standalone_kfp(pipeline_file: str, experiment_name: str):
"""Run a pipeline on a standalone Kubeflow Pipelines cluster."""
with subprocess.Popen(["kubectl", "-n", "kubeflow", "port-forward", "svc/ml-pipeline-ui", "8080:80"], stdout=True) as proc: # noqa: E501
try:
time.sleep(2) # give some time to the port-forward connection

client = kfp.Client(
host=f"{KUBEFLOW_ENDPOINT}/pipeline",
)
created_run = client.create_run_from_pipeline_package(
pipeline_file=pipeline_file,
enable_caching=False,
arguments={},
run_name="kfp_test_run",
experiment_name=experiment_name,
)
run_id = created_run.run_id
logger.info(f"Submitted run with ID: {run_id}")
logger.info(f"Waiting for run {run_id} to complete....")
run_detail = created_run.wait_for_run_completion()
_handle_job_end(run_detail)

# clean up
experiment = client.get_experiment(experiment_name=experiment_name)
client.delete_experiment(experiment.id)
logger.info("Done")

except Exception as e:
logger.error(f"ERROR: {e}")
raise e
finally:
proc.terminate()


def _handle_job_end(run_detail):
finished_run = run_detail.to_dict()["run"]

created_at = finished_run["created_at"]
finished_at = finished_run["finished_at"]

duration_secs = (finished_at - created_at).total_seconds()

status = finished_run["status"]

logger.info(f"Run finished in {round(duration_secs)} seconds with status: {status}")

if status != "Succeeded":
Expand All @@ -196,7 +216,6 @@ def build_load_image():
output = subprocess.check_output(
["docker", "exec", f"{CLUSTER_NAME}-control-plane", "crictl", "images"]
)

if IMAGE_NAME in output.decode():
logging.info(f"Image already in cluster.")
else:
Expand All @@ -206,14 +225,25 @@ def build_load_image():

@pytest.mark.order(6)
@pytest.mark.timeout(240)
@pytest.mark.skipif(IS_STANDALONE_KFP, reason="It is not Kubeflow")
def test_run_pipeline():

# build the base docker image and load it into the cluster
build_load_image()

# submit and run pipeline
run_pipeline(pipeline_file=str(PIPELINE_FILE), experiment_name=EXPERIMENT_NAME)


@pytest.mark.order(6)
@pytest.mark.timeout(240)
@pytest.mark.skipif(not IS_STANDALONE_KFP, reason="It is not standalone KFP")
def test_run_pipeline_standalone_kfp():
# build the base docker image and load it into the cluster
build_load_image()
# submit and run pipeline
run_pipeline_standalone_kfp(
pipeline_file=str(PIPELINE_FILE), experiment_name=EXPERIMENT_NAME
)


if __name__ == "__main__":
test_run_pipeline()
39 changes: 23 additions & 16 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
import logging
import pathlib
import pytest
import os
from envsubst import envsubst

from .conftest import HOST_IP
from .test_kfp import run_pipeline
from .conftest import HOST_IP, IS_STANDALONE_KFP, SKIP_LOCAL_REGISTRY
from .test_kfp import run_pipeline, run_pipeline_standalone_kfp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

BUILD_FILE = pathlib.Path(__file__).parent / "resources" / "registry" / "build_push_image.sh"
PIPELINE_TEMPLATE = pathlib.Path(__file__).parent / "resources" / "registry" / "pipeline.yaml.template"
BUILD_FILE = pathlib.Path(__file__).parent / "resources" / "registry" / "build_push_image.sh" # noqa
PIPELINE_TEMPLATE = pathlib.Path(__file__).parent / "resources" / "registry" / "pipeline.yaml.template" # noqa

IMAGE_NAME = "kfp-registry-test-image"
EXPERIMENT_NAME = "Test Experiment (Registry)"
Expand All @@ -32,29 +31,37 @@ def render_pipeline_yaml(output: str):


@pytest.mark.order(7)
@pytest.mark.skipif(
os.environ.get('INSTALL_LOCAL_REGISTRY') == 'false',
reason="No local image registry was installed."
)
@pytest.mark.skipif(SKIP_LOCAL_REGISTRY, reason="No local image registry was installed")
def test_push_image():
# build the base docker image and load it into the cluster
build_push_image()


@pytest.mark.order(8)
@pytest.mark.timeout(120)
@pytest.mark.skipif(
os.environ.get('INSTALL_LOCAL_REGISTRY') == 'false',
reason="No local image registry was installed."
)
@pytest.mark.skipif(SKIP_LOCAL_REGISTRY, reason="No local image registry was installed")
@pytest.mark.skipif(IS_STANDALONE_KFP, reason="It is not Kubeflow")
def test_run_pipeline_using_registry(tmp_path):

# build the base docker image and load it into the cluster
build_push_image()

# create pipeline.yaml with the right registry IP address
pipeline_file = tmp_path / "pipeline.yaml"
render_pipeline_yaml(output=str(pipeline_file))

# submit and run pipeline
run_pipeline(pipeline_file=str(pipeline_file), experiment_name=EXPERIMENT_NAME)


@pytest.mark.order(8)
@pytest.mark.timeout(120)
@pytest.mark.skipif(SKIP_LOCAL_REGISTRY, reason="No local image registry was installed")
@pytest.mark.skipif(not IS_STANDALONE_KFP, reason="It is not standalone KFP")
def test_run_pipeline_standalone_kfp_using_registry(tmp_path):
# build the base docker image and load it into the cluster
build_push_image()
# create pipeline.yaml with the right registry IP address
pipeline_file = tmp_path / "pipeline.yaml"
render_pipeline_yaml(output=str(pipeline_file))
# submit and run pipeline
run_pipeline_standalone_kfp(
pipeline_file=str(pipeline_file), experiment_name=EXPERIMENT_NAME
)
16 changes: 16 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Union


def parse_bool(val: Union[str, bool]) -> bool:
"""Convert a string representation of truth to True or False.
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1', True):
return True
elif val in ('n', 'no', 'f', 'false', 'off', '0', False):
return False
else:
raise ValueError(f"Invalid truth value {val}")

0 comments on commit 5392e47

Please sign in to comment.