diff --git a/CHANGELOG.md b/CHANGELOG.md index 993e1c49..f3904f88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Change Log +## Release 1.3.4 (11/14/23) + +### Changed + +- Logs are now JSON formatted +- Exposed logging `job_id` now `request_id` + +### Added + +- `get_endpoints` exposed to return all endpoints for a given user + +--- + ## Release 1.3.3 (11/8/23) ### Added diff --git a/examples/api/get_endpoints.py b/examples/api/get_endpoints.py new file mode 100644 index 00000000..1cb6490f --- /dev/null +++ b/examples/api/get_endpoints.py @@ -0,0 +1,8 @@ +""" Get all endpoints from the API """ + + +import runpod + +endpoints = runpod.get_endpoints() + +print(endpoints) diff --git a/examples/serverless/logger.py b/examples/serverless/logger.py index defb3d58..67e74967 100644 --- a/examples/serverless/logger.py +++ b/examples/serverless/logger.py @@ -18,10 +18,10 @@ # ERROR | An error message -log.debug('A debug message', job_id=JOB_ID) -log.info('An info message', job_id=JOB_ID) -log.warn('A warning message', job_id=JOB_ID) -log.error('An error message', job_id=JOB_ID) +log.debug('A debug message', request_id=JOB_ID) +log.info('An info message', request_id=JOB_ID) +log.warn('A warning message', request_id=JOB_ID) +log.error('An error message', request_id=JOB_ID) # Output: # {"requestId": "1234567890", "message": "A debug message", "level": "DEBUG"} diff --git a/runpod/__init__.py b/runpod/__init__.py index c084a037..fbf30e1b 100644 --- a/runpod/__init__.py +++ b/runpod/__init__.py @@ -13,7 +13,7 @@ get_gpu, get_gpus, get_pod, get_pods, create_pod, stop_pod, resume_pod, terminate_pod, create_template, - create_endpoint + get_endpoints, create_endpoint, update_endpoint_template ) from .cli.groups.config.functions import set_credentials, check_credentials, get_credentials diff --git a/runpod/api/ctl_commands.py b/runpod/api/ctl_commands.py index 8f681b44..3d2e2803 100644 --- a/runpod/api/ctl_commands.py +++ b/runpod/api/ctl_commands.py @@ -9,6 +9,7 @@ from .mutations import user as user_mutations from .queries import gpus from .queries import pods as pod_queries +from .queries import endpoints as endpoint_queries from .graphql import run_graphql_query from .mutations import pods as pod_mutations from .mutations import endpoints as endpoint_mutations @@ -228,6 +229,14 @@ def create_template( return raw_response["data"]["saveTemplate"] +def get_endpoints() -> dict: + ''' + Get all endpoints + ''' + raw_return = run_graphql_query(endpoint_queries.QUERY_ENDPOINT) + cleaned_return = raw_return["data"]["myself"]["endpoints"] + return cleaned_return + def create_endpoint( name:str, template_id:str, gpu_ids:str="AMPERE_16", network_volume_id:str=None, locations:str=None, @@ -262,3 +271,25 @@ def create_endpoint( ) return raw_response["data"]["saveEndpoint"] + + +def update_endpoint_template( + endpoint_id:str, template_id:str +): + ''' + Update an endpoint template + + :param endpoint_id: the id of the endpoint + :param template_id: the id of the template to use for the endpoint + + :example: + + >>> endpoint_id = runpod.update_endpoint_template("test", "template_id") + ''' + raw_response = run_graphql_query( + endpoint_mutations.update_endpoint_template_mutation( + endpoint_id, template_id + ) + ) + + return raw_response["data"]["updateEndpointTemplate"] diff --git a/runpod/api/mutations/endpoints.py b/runpod/api/mutations/endpoints.py index 503e94dc..2c6d8ae6 100644 --- a/runpod/api/mutations/endpoints.py +++ b/runpod/api/mutations/endpoints.py @@ -2,11 +2,12 @@ # pylint: disable=too-many-arguments + def generate_endpoint_mutation( - name:str, template_id:str, gpu_ids:str="AMPERE_16", - network_volume_id:str=None, locations:str=None, - idle_timeout:int=5, scaler_type:str="QUEUE_DELAY", scaler_value:int=4, - workers_min:int=0, workers_max:int=3 + name: str, template_id: str, gpu_ids: str = "AMPERE_16", + network_volume_id: str = None, locations: str = None, + idle_timeout: int = 5, scaler_type: str = "QUEUE_DELAY", scaler_value: int = 4, + workers_min: int = 0, workers_max: int = 3 ): """ Generate a string for a GraphQL mutation to create a new endpoint. """ input_fields = [] @@ -57,3 +58,26 @@ def generate_endpoint_mutation( }} }} """ + + +def update_endpoint_template_mutation( + endpoint_id: str, template_id: str +): + """ Generate a string for a GraphQL mutation to update an existing endpoint's template. """ + input_fields = [] + + # ------------------------------ Required Fields ----------------------------- # + input_fields.append(f'templateId: "{template_id}"') + input_fields.append(f'endpointId: "{endpoint_id}"') + + # Format the input fields into a string + input_fields_string = ", ".join(input_fields) + result = f""" + mutation {{ + updateEndpointTemplate(input: {{{input_fields_string}}}) {{ + id + templateId + }} + }} + """ + return result diff --git a/runpod/api/queries/endpoints.py b/runpod/api/queries/endpoints.py new file mode 100644 index 00000000..e262d760 --- /dev/null +++ b/runpod/api/queries/endpoints.py @@ -0,0 +1,36 @@ +""" GraphQL queries for endpoints. """ + +QUERY_ENDPOINT = """ +query Query { + myself { + endpoints { + aiKey + gpuIds + id + idleTimeout + name + networkVolumeId + locations + scalerType + scalerValue + templateId + type + userId + version + workersMax + workersMin + workersStandby + gpuCount + env { + key + value + } + createdAt + networkVolume { + id + dataCenterId + } + } + } +} +""" diff --git a/runpod/cli/groups/project/functions.py b/runpod/cli/groups/project/functions.py index fbc30ff6..053f98fd 100644 --- a/runpod/cli/groups/project/functions.py +++ b/runpod/cli/groups/project/functions.py @@ -5,21 +5,59 @@ import os import sys import uuid +from datetime import datetime import tomlkit from tomlkit import document, comment, table, nl -from runpod import __version__, get_pod, create_template, create_endpoint +from runpod import __version__, get_pod, create_template, create_endpoint, update_endpoint_template from runpod.cli import BASE_DOCKER_IMAGE, GPU_TYPES, ENV_VARS from runpod.cli.utils.ssh_cmd import SSHConnection -from .helpers import get_project_pod, copy_template_files, attempt_pod_launch, load_project_config +from .helpers import ( + get_project_pod, copy_template_files, + attempt_pod_launch, load_project_config, get_project_endpoint +) from ...utils.rp_sync import sync_directory STARTER_TEMPLATES = os.path.join(os.path.dirname(__file__), 'starter_templates') -# -------------------------------- New Project ------------------------------- # + +def _launch_dev_pod(): + """ Launch a development pod. """ + config = load_project_config() # Load runpod.toml + + print("Deploying development pod on RunPod...") + + # Prepare the environment variables + environment_variables = {"RUNPOD_PROJECT_ID": config["project"]["uuid"]} + for variable in config['project'].get('env_vars', {}): + environment_variables[variable] = config['project']['env_vars'][variable] + + # Prepare the GPU types + selected_gpu_types = config['project'].get('gpu_types', []) + if config['project'].get('gpu', None): + selected_gpu_types.append(config['project']['gpu']) + + # Attempt to launch a pod with the given configuration + new_pod = attempt_pod_launch(config, environment_variables) + if new_pod is None: + print("Selected GPU types unavailable, try again later or use a different type.") + return None + + print("Waiting for pod to come online... ", end="") + sys.stdout.flush() + + # Wait for the pod to come online + while new_pod.get('desiredStatus', None) != 'RUNNING' or new_pod.get('runtime') is None: + new_pod = get_pod(new_pod['id']) + + project_pod_id = new_pod['id'] + + print(f"Project {config['project']['name']} pod ({project_pod_id}) created.", end="\n\n") + return project_pod_id +# -------------------------------- New Project ------------------------------- # def create_new_project(project_name, runpod_volume_id, cuda_version, python_version, # pylint: disable=too-many-locals, too-many-arguments, too-many-statements model_type=None, model_name=None, init_current_dir=False): """ Create a new project. """ @@ -124,63 +162,40 @@ def start_project(): # pylint: disable=too-many-locals, too-many-branches # Check if the project pod already exists, if not create it. if not project_pod_id: + project_pod_id = _launch_dev_pod() - print("Deploying development pod on RunPod...") - - # Prepare the environment variables - environment_variables = {"RUNPOD_PROJECT_ID": config["project"]["uuid"]} - for variable in config['project'].get('env_vars', {}): - environment_variables[variable] = config['project']['env_vars'][variable] - - # Prepare the GPU types - selected_gpu_types = config['project'].get('gpu_types', []) - if config['project'].get('gpu', None): - selected_gpu_types.append(config['project']['gpu']) - - # Attempt to launch a pod with the given configuration - new_pod = attempt_pod_launch(config, environment_variables) - if new_pod is None: - print("Selected GPU types unavailable, try again later or use a different type.") - return - - print("Waiting for pod to come online... ", end="") - sys.stdout.flush() - - # Wait for the pod to come online - while new_pod.get('desiredStatus', None) != 'RUNNING' or new_pod.get('runtime') is None: - new_pod = get_pod(new_pod['id']) - - project_pod_id = new_pod['id'] - - print(f"Project {config['project']['name']} pod ({project_pod_id}) created.", end="\n\n") + if project_pod_id is None: + return with SSHConnection(project_pod_id) as ssh_conn: project_path_uuid = f'{config["project"]["volume_mount_path"]}/{config["project"]["uuid"]}' - remote_project_path = os.path.join(project_path_uuid, config["project"]["name"]) + project_path_uuid_dev = os.path.join(project_path_uuid, 'dev') + project_path_uuid_prod = os.path.join(project_path_uuid, 'prod') + remote_project_path = os.path.join(project_path_uuid_dev, config["project"]["name"]) # Create the project folder on the pod print(f'Checking pod project folder: {remote_project_path} on pod {project_pod_id}') - ssh_conn.run_commands([f'mkdir -p {remote_project_path}']) + ssh_conn.run_commands([f'mkdir -p {remote_project_path} {project_path_uuid_prod}']) # Copy local files to the pod project folder print(f'Syncing files to pod {project_pod_id}') - ssh_conn.rsync(os.getcwd(), project_path_uuid) + ssh_conn.rsync(os.getcwd(), project_path_uuid_dev) # Create the virtual environment - venv_path = os.path.join(project_path_uuid, "venv") + venv_path = os.path.join(project_path_uuid_dev, "venv") print(f'Activating Python virtual environment: {venv_path} on pod {project_pod_id}') commands = [ f'python{config["runtime"]["python_version"]} -m venv {venv_path}', f'source {venv_path}/bin/activate &&' f'cd {remote_project_path} &&' 'python -m pip install --upgrade pip &&' - f'python -m pip install --requirement {config["runtime"]["requirements_path"]}' + f'python -m pip install -v --requirement {config["runtime"]["requirements_path"]}' ] ssh_conn.run_commands(commands) # Start the watcher and then start the API development server - sync_directory(ssh_conn, os.getcwd(), project_path_uuid) + sync_directory(ssh_conn, os.getcwd(), project_path_uuid_dev) project_name = config["project"]["name"] pip_req_path = os.path.join(remote_project_path, config['runtime']['requirements_path']) @@ -217,14 +232,14 @@ def start_project(): # pylint: disable=too-many-locals, too-many-branches trap cleanup EXIT SIGINT - if source {project_path_uuid}/venv/bin/activate; then + if source {project_path_uuid_dev}/venv/bin/activate; then echo -e "- Activated virtual environment." else echo "Failed to activate virtual environment." exit 1 fi - if cd {project_path_uuid}/{project_name}; then + if cd {project_path_uuid_dev}/{project_name}; then echo -e "- Changed to project directory." else echo "Failed to change directory." @@ -287,35 +302,74 @@ def start_project(): # pylint: disable=too-many-locals, too-many-branches # ------------------------------ Deploy Project ------------------------------ # def create_project_endpoint(): """ Create a project endpoint. + - Move code in dev to prod folder + - TODO: git commit the diff from current state to new state - Create a serverless template for the project - Create a new endpoint using the template """ config = load_project_config() + project_pod_id = get_project_pod(config['project']['uuid']) + + # Check if the project pod already exists, if not create it. + if not project_pod_id: + project_pod_id = _launch_dev_pod() + + if project_pod_id is None: + return None + + with SSHConnection(project_pod_id) as ssh_conn: + project_path_uuid = f'{config["project"]["volume_mount_path"]}/{config["project"]["uuid"]}' + project_path_uuid_prod = os.path.join(project_path_uuid, 'prod') + remote_project_path = os.path.join(project_path_uuid_prod, config["project"]["name"]) + + # Copy local files to the pod project folder + ssh_conn.run_commands([f'mkdir -p {remote_project_path}']) + print(f'Syncing files to pod {project_pod_id} prod') + ssh_conn.rsync(os.getcwd(), project_path_uuid_prod) + + # Create the virtual environment + venv_path = os.path.join(project_path_uuid_prod, 'venv') + print(f'Activating Python virtual environment: {venv_path} on pod {project_pod_id}') + commands = [ + f'python{config["runtime"]["python_version"]} -m venv {venv_path}', + f'source {venv_path}/bin/activate &&' + f'cd {remote_project_path} &&' + 'python -m pip install --upgrade pip &&' + f'python -m pip install -v --requirement {config["runtime"]["requirements_path"]}' + ] + ssh_conn.run_commands(commands) + ssh_conn.close() environment_variables = {} for variable in config['project']['env_vars']: environment_variables[variable] = config['project']['env_vars'][variable] # Construct the docker start command - docker_start_cmd_prefix = 'bash -c "' - activate_cmd = f'. /runpod-volume/{config["project"]["uuid"]}/venv/bin/activate' - python_cmd = f'python -u /runpod-volume/{config["project"]["uuid"]}/{config["project"]["name"]}/{config["runtime"]["handler_path"]}' # pylint: disable=line-too-long - docker_start_cmd_suffix = '"' - docker_start_cmd = docker_start_cmd_prefix + activate_cmd + ' && ' + \ - python_cmd + docker_start_cmd_suffix # pylint: disable=line-too-long + activate_cmd = f'. /runpod-volume/{config["project"]["uuid"]}/prod/venv/bin/activate' + python_cmd = f'python -u /runpod-volume/{config["project"]["uuid"]}/prod/{config["project"]["name"]}/{config["runtime"]["handler_path"]}' # pylint: disable=line-too-long + docker_start_cmd = 'bash -c "' + activate_cmd + ' && ' + python_cmd + '"' project_endpoint_template = create_template( - name=f'{config["project"]["name"]}-endpoint | {config["project"]["uuid"]}', + name=f'{config["project"]["name"]}-endpoint | {config["project"]["uuid"]} | {datetime.now()}', # pylint: disable=line-too-long image_name=config['project']['base_image'], container_disk_in_gb=config['project']['container_disk_size_gb'], docker_start_cmd=docker_start_cmd, env=environment_variables, is_serverless=True ) - deployed_endpoint = create_endpoint( - name=f'{config["project"]["name"]}-endpoint | {config["project"]["uuid"]}', - template_id=project_endpoint_template['id'], - network_volume_id=config['project']['storage_id'], - ) + deployed_endpoint = get_project_endpoint(config['project']['uuid']) + if not deployed_endpoint: + deployed_endpoint = create_endpoint( + name=f'{config["project"]["name"]}-endpoint | {config["project"]["uuid"]}', + template_id=project_endpoint_template['id'], + network_volume_id=config['project']['storage_id'], + ) + else: + deployed_endpoint = update_endpoint_template( + endpoint_id=deployed_endpoint['id'], + template_id=project_endpoint_template['id'], + ) + + # does user want to tear down and recreate workers immediately? return deployed_endpoint['id'] diff --git a/runpod/cli/groups/project/helpers.py b/runpod/cli/groups/project/helpers.py index 0e8e33c1..e62c4f32 100644 --- a/runpod/cli/groups/project/helpers.py +++ b/runpod/cli/groups/project/helpers.py @@ -7,9 +7,10 @@ import click import tomlkit -from runpod import get_pods, create_pod +from runpod import get_pods, create_pod, get_endpoints from runpod import error as rp_error + def validate_project_name(name): ''' Validate the project name. @@ -19,6 +20,7 @@ def validate_project_name(name): raise click.BadParameter(f"Project name contains an invalid character: '{match.group()}'.") return name + def get_project_pod(project_id: str): """Check if a project pod exists. Return the pod_id if it exists, else return None. @@ -29,6 +31,18 @@ def get_project_pod(project_id: str): return None + +def get_project_endpoint(project_id: str): + """Check if a project endpoint exists. + Return the endpoint if it exists, else return None. + """ + for endpoint in get_endpoints(): + if project_id in endpoint['name']: + return endpoint + + return None + + def copy_template_files(template_dir, destination): """Copy the template files to the destination directory.""" for item in os.listdir(template_dir): @@ -39,6 +53,7 @@ def copy_template_files(template_dir, destination): else: shutil.copy2(source_item, destination_item) + def attempt_pod_launch(config, environment_variables): """Attempt to launch a pod with the given configuration.""" for gpu_type in config['project'].get('gpu_types', []): @@ -62,6 +77,7 @@ def attempt_pod_launch(config, environment_variables): print("Unavailable.") return None + def load_project_config(): """Load the project config file.""" project_config_file = os.path.join(os.getcwd(), 'runpod.toml') diff --git a/runpod/cli/utils/ssh_cmd.py b/runpod/cli/utils/ssh_cmd.py index 5225e772..e23ca6ec 100644 --- a/runpod/cli/utils/ssh_cmd.py +++ b/runpod/cli/utils/ssh_cmd.py @@ -131,6 +131,7 @@ def rsync(self, local_path, remote_path, quiet=False): return subprocess.run(rsync_cmd, check=True) + def close(self): ''' Close the SSH connection. ''' self.ssh.close() diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index 411d6874..3498bacb 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -70,7 +70,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] continue next_job = await response.json() - log.debug(f"Received Job | {next_job}") + log.debug(f"Request Received | {next_job}") # Check if the job is valid job_id = next_job.get("id", None) @@ -94,11 +94,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] if not retry: return None - log.debug(f"{next_job['id']} | Valid Job Confirmed") + log.debug("Confirmed valid request.", next_job['id']) if next_job: job_list.add_job(next_job["id"]) - log.debug(f"{next_job['id']} | Added Job ID") + log.debug("Request ID added.", next_job['id']) return next_job @@ -108,14 +108,14 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: Run the job using the handler. Returns the job output or error. """ - log.info(f'{job["id"]} | Started') + log.info('Started', job["id"]) run_result = {"error": "No output from handler."} try: handler_return = handler(job) job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return - log.debug(f'{job["id"]} | Handler output: {job_output}') + log.debug(f'Handler output: {job_output}', job["id"]) if isinstance(job_output, dict): error_msg = job_output.pop("error", None) @@ -149,12 +149,12 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: "runpod_version": runpod_version } - log.error(f'{job["id"]} | Captured Handler Exception') + log.error('Captured Handler Exception', job["id"]) log.error(json.dumps(error_info, indent=4)) run_result = {"error": json.dumps(error_info)} finally: - log.debug(f'{job["id"]} | run_job return: {run_result}') + log.debug(f'run_job return: {run_result}', job["id"]) return run_result @@ -175,7 +175,7 @@ async def run_job_generator( for output_partial in job_output: yield {"output": output_partial} except Exception as err: # pylint: disable=broad-except - log.error(f'Error while running job {job["id"]}: {err}') + log.error(err, job["id"]) yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"} finally: - log.info(f'{job["id"]} | Finished ') + log.info('Finished', job["id"]) diff --git a/runpod/serverless/modules/rp_logger.py b/runpod/serverless/modules/rp_logger.py index 3811bb8a..86725050 100644 --- a/runpod/serverless/modules/rp_logger.py +++ b/runpod/serverless/modules/rp_logger.py @@ -71,7 +71,7 @@ def log(self, message, message_level='INFO', job_id=None): if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP': return - if job_id: + if os.environ.get('RUNPOD_ENDPOINT_ID'): log_json = { 'requestId': job_id, 'message': message, @@ -80,6 +80,9 @@ def log(self, message, message_level='INFO', job_id=None): print(json.dumps(log_json), flush=True) return + if job_id: + message = f'{job_id} | {message}' + print(f'{message_level.ljust(7)}| {message}', flush=True) return @@ -92,29 +95,29 @@ def secret(self, secret_name, secret): redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1] self.info(f"{secret_name}: {redacted_secret}") - def debug(self, message, job_id=None): + def debug(self, message, request_id=None): ''' debug log ''' - self.log(message, 'DEBUG', job_id) + self.log(message, 'DEBUG', request_id) - def info(self, message, job_id=None): + def info(self, message, request_id=None): ''' info log ''' - self.log(message, 'INFO', job_id) + self.log(message, 'INFO', request_id) - def warn(self, message, job_id=None): + def warn(self, message, request_id=None): ''' warn log ''' - self.log(message, 'WARN', job_id) + self.log(message, 'WARN', request_id) - def error(self, message, job_id=None): + def error(self, message, request_id=None): ''' error log ''' - self.log(message, 'ERROR', job_id) + self.log(message, 'ERROR', request_id) def tip(self, message): ''' diff --git a/runpod/serverless/worker.py b/runpod/serverless/worker.py index f8694863..13e6d019 100644 --- a/runpod/serverless/worker.py +++ b/runpod/serverless/worker.py @@ -63,7 +63,7 @@ async def _process_job(job, session, job_scaler, config): # If refresh_worker is set, pod will be reset after job is complete. if config.get("refresh_worker", False): - log.info(f"refresh_worker | Flag set, stopping pod after job {job['id']}.") + log.info("refresh_worker flag set, stopping pod after job.", job['id']) job_result["stopPod"] = True job_scaler.kill_worker() diff --git a/tests/test_api/test_ctl_commands.py b/tests/test_api/test_ctl_commands.py index 5fe311b4..68c0f482 100644 --- a/tests/test_api/test_ctl_commands.py +++ b/tests/test_api/test_ctl_commands.py @@ -6,6 +6,7 @@ from runpod.api import ctl_commands + class TestCTL(unittest.TestCase): ''' Tests for CTL Commands ''' @@ -94,18 +95,17 @@ def test_get_gpu(self): with self.assertRaises(ValueError) as context: gpu = ctl_commands.get_gpu("Not a GPU") - self.assertEqual(str(context.exception), - "No GPU found with the specified ID, " - "run runpod.get_gpus() to get a list of all GPUs") + "No GPU found with the specified ID, " + "run runpod.get_gpus() to get a list of all GPUs") def test_create_pod(self): ''' Tests create_pod ''' with patch("runpod.api.graphql.requests.post") as patch_request, \ - patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu, \ - patch("runpod.api.ctl_commands.get_user") as patch_get_user: + patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu, \ + patch("runpod.api.ctl_commands.get_user") as patch_get_user: patch_request.return_value.json.return_value = { "data": { @@ -145,7 +145,7 @@ def test_create_pod(self): network_volume_id="NETWORK_VOLUME_ID") self.assertEqual(str(context.exception), - "cloud_type must be one of ALL, COMMUNITY or SECURE") + "cloud_type must be one of ALL, COMMUNITY or SECURE") def test_stop_pod(self): ''' @@ -215,7 +215,6 @@ def test_raised_error(self): self.assertEqual(str(context.exception), "Error Message") - # Test Unauthorized with status code 401 with patch("runpod.api.graphql.requests.post") as patch_request: patch_request.return_value.status_code = 401 @@ -256,7 +255,7 @@ def test_get_pods(self): "vcpuCount": 21, "volumeInGb": 200, "volumeMountPath": "/workspace", - "machine": { "gpuDisplayName": "RTX 3090" } + "machine": {"gpuDisplayName": "RTX 3090"} } ] } @@ -296,7 +295,7 @@ def test_get_pod(self): "vcpuCount": 21, "volumeInGb": 200, "volumeMountPath": "/workspace", - "machine": { "gpuDisplayName": "RTX 3090" } + "machine": {"gpuDisplayName": "RTX 3090"} } } } @@ -310,7 +309,7 @@ def test_create_template(self): Tests create_template ''' with patch("runpod.api.graphql.requests.post") as patch_request, \ - patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu: + patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu: patch_request.return_value.json.return_value = { "data": { @@ -329,12 +328,39 @@ def test_create_template(self): self.assertEqual(template["id"], "TEMPLATE_ID") + def test_get_endpoints(self): + ''' + Tests get_endpoints + ''' + with patch("runpod.api.graphql.requests.post") as patch_request: + patch_request.return_value.json.return_value = { + "data": { + "myself": { + "endpoints": [ + { + "id": "ENDPOINT_ID", + "name": "ENDPOINT_NAME", + "template": { + "id": "TEMPLATE_ID", + "imageName": "IMAGE_NAME" + } + } + ] + } + } + } + + endpoints = ctl_commands.get_endpoints() + + self.assertEqual(len(endpoints), 1) + self.assertEqual(endpoints[0]["id"], "ENDPOINT_ID") + def test_create_endpoint(self): ''' Tests create_endpoint ''' with patch("runpod.api.graphql.requests.post") as patch_request, \ - patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu: + patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu: patch_request.return_value.json.return_value = { "data": { @@ -352,3 +378,27 @@ def test_create_endpoint(self): ) self.assertEqual(endpoint["id"], "ENDPOINT_ID") + + def test_update_endpoint_template(self): + ''' + Tests update_endpoint_template + ''' + with patch("runpod.api.graphql.requests.post") as patch_request, \ + patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu: + + patch_request.return_value.json.return_value = { + "data": { + "updateEndpointTemplate": { + "id": "ENDPOINT_ID" + } + } + } + + patch_get_gpu.return_value = None + + endpoint = ctl_commands.update_endpoint_template( + endpoint_id="ENDPOINT_ID", + template_id="TEMPLATE_ID" + ) + + self.assertEqual(endpoint["id"], "ENDPOINT_ID") diff --git a/tests/test_cli/test_cli_groups/test_project_functions.py b/tests/test_cli/test_cli_groups/test_project_functions.py index 3a5a6ed0..0c165d14 100644 --- a/tests/test_cli/test_cli_groups/test_project_functions.py +++ b/tests/test_cli/test_cli_groups/test_project_functions.py @@ -4,11 +4,12 @@ import unittest from unittest.mock import patch, mock_open -from runpod.cli.groups.project.functions import( +from runpod.cli.groups.project.functions import ( STARTER_TEMPLATES, create_new_project, start_project, create_project_endpoint ) + class TestCreateNewProject(unittest.TestCase): """ Test the create_new_project function.""" @@ -16,7 +17,7 @@ class TestCreateNewProject(unittest.TestCase): @patch("os.path.exists", return_value=False) @patch("os.getcwd", return_value="/current/path") @patch("runpod.cli.groups.project.functions.copy_template_files") - def test_create_project_folder(self, mock_copy_template_files, mock_getcwd, mock_exists, mock_makedirs): # pylint: disable=line-too-long + def test_create_project_folder(self, mock_copy_template_files, mock_getcwd, mock_exists, mock_makedirs): # pylint: disable=line-too-long """ Test that a new project folder is created if init_current_dir is False. """ with patch("builtins.open", new_callable=mock_open): create_new_project("test_project", "volume_id", "11.1.1", "3.8") @@ -29,7 +30,7 @@ def test_create_project_folder(self, mock_copy_template_files, mock_getcwd, mock @patch('os.path.exists', return_value=False) @patch('os.getcwd', return_value='/tmp/testdir') @patch('builtins.open', new_callable=mock_open) - def test_create_new_project_init_current_dir(self, mock_file_open, mock_getcwd, mock_path_exists, mock_makedirs): # pylint: disable=line-too-long + def test_create_new_project_init_current_dir(self, mock_file_open, mock_getcwd, mock_path_exists, mock_makedirs): # pylint: disable=line-too-long """ Test that a new project folder is not created if init_current_dir is True. """ project_name = "test_project" runpod_volume_id = "12345" @@ -43,30 +44,29 @@ def test_create_new_project_init_current_dir(self, mock_file_open, mock_getcwd, assert mock_getcwd.called assert mock_path_exists.called is False - @patch("os.makedirs") @patch("os.path.exists", return_value=False) @patch("os.getcwd", return_value="/current/path") @patch("runpod.cli.groups.project.functions.copy_template_files") - def test_copy_template_files(self, mock_copy_template_files, mock_getcwd, mock_exists, mock_makedirs): # pylint: disable=line-too-long + def test_copy_template_files(self, mock_copy_template_files, mock_getcwd, mock_exists, mock_makedirs): # pylint: disable=line-too-long """ Test that template files are copied to the new project folder. """ with patch("builtins.open", new_callable=mock_open): create_new_project("test_project", "volume_id", "11.1.1", "3.8") - mock_copy_template_files.assert_called_once_with(STARTER_TEMPLATES + "/default", "/current/path/test_project") # pylint: disable=line-too-long + mock_copy_template_files.assert_called_once_with( + STARTER_TEMPLATES + "/default", "/current/path/test_project") # pylint: disable=line-too-long assert mock_getcwd.called assert mock_exists.called assert mock_makedirs.called @patch("os.path.exists", return_value=True) - @patch("builtins.open", new_callable=mock_open, read_data="data with <> placeholder") # pylint: disable=line-too-long - def test_replace_placeholders_in_handler(self, mock_open_file, mock_exists): # pylint: disable=line-too-long + @patch("builtins.open", new_callable=mock_open, read_data="data with <> placeholder") # pylint: disable=line-too-long + def test_replace_placeholders_in_handler(self, mock_open_file, mock_exists): # pylint: disable=line-too-long """ Test that placeholders in handler.py are replaced if model_name is given. """ with patch("runpod.cli.groups.project.functions.copy_template_files"): create_new_project("test_project", "volume_id", "11.8.0", "3.8", model_name="my_model") assert mock_open_file.called assert mock_exists.called - @patch("os.path.exists", return_value=False) @patch("builtins.open", new_callable=mock_open) def test_create_runpod_toml(self, mock_open_file, mock_exists): @@ -74,7 +74,8 @@ def test_create_runpod_toml(self, mock_open_file, mock_exists): with patch("runpod.cli.groups.project.functions.copy_template_files"): create_new_project("test_project", "volume_id", "11.8.0", "3.8") toml_file_location = os.path.join(os.getcwd(), "test_project", "runpod.toml") - mock_open_file.assert_called_with(toml_file_location, 'w', encoding="UTF-8") # pylint: disable=line-too-long + mock_open_file.assert_called_with( + toml_file_location, 'w', encoding="UTF-8") # pylint: disable=line-too-long assert mock_exists.called @patch("os.path.exists", return_value=True) @@ -82,7 +83,7 @@ def test_create_runpod_toml(self, mock_open_file, mock_exists): def test_update_requirements_file(self, mock_open_file, mock_exists): """ Test that placeholders in requirements.txt are replaced correctly. """ with patch("runpod.cli.groups.project.functions.__version__", "dev"), \ - patch("runpod.cli.groups.project.functions.copy_template_files"): + patch("runpod.cli.groups.project.functions.copy_template_files"): create_new_project("test_project", "volume_id", "11.8.0", "3.8") assert mock_open_file.called assert mock_exists.called @@ -92,7 +93,7 @@ def test_update_requirements_file(self, mock_open_file, mock_exists): def test_update_requirements_file_non_dev(self, mock_open_file, mock_exists): """ Test that placeholders in requirements.txt are replaced for non-dev versions. """ with patch("runpod.cli.groups.project.functions.__version__", "1.0.0"), \ - patch("runpod.cli.groups.project.functions.copy_template_files"): + patch("runpod.cli.groups.project.functions.copy_template_files"): create_new_project("test_project", "volume_id", "11.8.0", "3.8") assert mock_open_file.called assert mock_exists.called @@ -107,7 +108,7 @@ class TestStartProject(unittest.TestCase): @patch('runpod.cli.groups.project.functions.get_pod') @patch('runpod.cli.groups.project.functions.SSHConnection') @patch('os.getcwd', return_value='/current/path') - def test_start_nonexistent_successfully(self, mock_getcwd, mock_ssh_connection, mock_get_pod, mock_attempt_pod_launch, mock_get_project_pod, mock_load_project_config): # pylint: disable=line-too-long, too-many-arguments + def test_start_nonexistent_successfully(self, mock_getcwd, mock_ssh_connection, mock_get_pod, mock_attempt_pod_launch, mock_get_project_pod, mock_load_project_config): # pylint: disable=line-too-long, too-many-arguments """ Test that a project is launched successfully. """ mock_load_project_config.return_value = { 'project': { @@ -159,11 +160,12 @@ def test_failed_pod_launch(self, mock_attempt_pod, mock_get_pod): mock_attempt_pod.return_value = None mock_get_pod.return_value = None - with patch('builtins.print') as mock_print,\ - patch('runpod.cli.groups.project.functions.load_project_config'): - start_project() - mock_print.assert_called_with("Selected GPU types unavailable, try again later or use a different type.") # pylint: disable=line-too-long + with patch('builtins.print') as mock_print, \ + patch('runpod.cli.groups.project.functions.load_project_config'): + start_project() + mock_print.assert_called_with( + "Selected GPU types unavailable, try again later or use a different type.") # pylint: disable=line-too-long class TestStartProjectAPI(unittest.TestCase): @@ -174,7 +176,7 @@ class TestStartProjectAPI(unittest.TestCase): @patch('runpod.cli.groups.project.functions.SSHConnection') @patch('os.getcwd', return_value='/current/path') @patch('runpod.cli.groups.project.functions.sync_directory') - def test_start_project_api_successfully(self, mock_sync_directory, mock_getcwd, mock_ssh_connection, mock_get_project_pod, mock_load_project_config): # pylint: disable=line-too-long, too-many-arguments + def test_start_project_api_successfully(self, mock_sync_directory, mock_getcwd, mock_ssh_connection, mock_get_project_pod, mock_load_project_config): # pylint: disable=line-too-long, too-many-arguments """ Test that a project API is started successfully. """ mock_load_project_config.return_value = { 'project': { @@ -200,24 +202,36 @@ def test_start_project_api_successfully(self, mock_sync_directory, mock_getcwd, mock_get_project_pod.assert_called_with('123456') mock_ssh_connection.assert_called_with({'id': 'pod_id'}) mock_sync_directory.assert_called_with(mock_ssh_instance, - '/current/path', '/mount/path/123456') + '/current/path', '/mount/path/123456/dev') mock_ssh_instance.run_commands.assert_called() assert mock_getcwd.called - class TestCreateProjectEndpoint(unittest.TestCase): """ Test the create_project_endpoint function. """ + @patch('runpod.cli.groups.project.functions.SSHConnection') @patch('runpod.cli.groups.project.functions.load_project_config') @patch('runpod.cli.groups.project.functions.create_template') @patch('runpod.cli.groups.project.functions.create_endpoint') - def test_create_project_endpoint(self, mock_create_endpoint, - mock_create_template, mock_load_project_config): + @patch('runpod.cli.groups.project.functions.update_endpoint_template') + @patch('runpod.cli.groups.project.functions.get_project_pod') + @patch('runpod.cli.groups.project.functions.get_project_endpoint') + def test_create_project_endpoint(self, mock_get_project_endpoint, mock_get_project_pod, mock_update_endpoint, mock_create_endpoint, # pylint: disable=too-many-arguments,line-too-long + mock_create_template, mock_load_project_config, mock_ssh_connection): # pylint: disable=line-too-long """ Test that a project endpoint is created successfully. """ + mock_get_project_endpoint.return_value = False + + mock_get_project_pod.return_value = None + with patch('runpod.cli.groups.project.functions._launch_dev_pod') as mock_launch_dev_pod: + mock_launch_dev_pod.return_value = None + assert create_project_endpoint() is None + + mock_get_project_pod.return_value = {'id': 'test_pod_id'} mock_load_project_config.return_value = { 'project': { 'name': 'test_project', + 'volume_mount_path': '/runpod-volume/123456', 'uuid': '123456', 'env_vars': {'TEST_VAR': 'value'}, 'base_image': 'test_image', @@ -225,25 +239,37 @@ def test_create_project_endpoint(self, mock_create_endpoint, 'storage_id': 'test_storage_id', }, 'runtime': { - 'handler_path': 'handler.py' + 'python_version': '3.8', + 'handler_path': 'handler.py', + 'requirements_path': 'requirements.txt' } } mock_create_template.return_value = {'id': 'test_template_id'} mock_create_endpoint.return_value = {'id': 'test_endpoint_id'} - result = create_project_endpoint() + mock_ssh_instance = mock_ssh_connection.return_value + mock_ssh_instance.__enter__.return_value = mock_ssh_instance + mock_ssh_instance.run_commands.return_value = None + + with patch('runpod.cli.groups.project.functions.datetime') as mock_datetime: + mock_datetime.now.return_value = '123456' + result = create_project_endpoint() self.assertEqual(result, 'test_endpoint_id') - mock_create_template.assert_called_once_with( - name='test_project-endpoint | 123456', + mock_create_template.assert_called_with( + name='test_project-endpoint | 123456 | 123456', image_name='test_image', container_disk_in_gb=10, - docker_start_cmd='bash -c ". /runpod-volume/123456/venv/bin/activate && python -u /runpod-volume/123456/test_project/handler.py"', # pylint: disable=line-too-long + docker_start_cmd='bash -c ". /runpod-volume/123456/prod/venv/bin/activate && python -u /runpod-volume/123456/prod/test_project/handler.py"', # pylint: disable=line-too-long env={'TEST_VAR': 'value'}, is_serverless=True ) - mock_create_endpoint.assert_called_once_with( + mock_create_endpoint.assert_called_with( name='test_project-endpoint | 123456', template_id='test_template_id', network_volume_id='test_storage_id' ) + + mock_update_endpoint.return_value = {'id': 'test_endpoint_id'} + mock_get_project_endpoint.return_value = {'id': 'test_endpoint_id'} + self.assertEqual(create_project_endpoint(), 'test_endpoint_id') diff --git a/tests/test_cli/test_cli_groups/test_project_helpers.py b/tests/test_cli/test_cli_groups/test_project_helpers.py index 1bfa146f..e58dc829 100644 --- a/tests/test_cli/test_cli_groups/test_project_helpers.py +++ b/tests/test_cli/test_cli_groups/test_project_helpers.py @@ -9,6 +9,7 @@ from runpod.cli.groups.project.helpers import ( validate_project_name, get_project_pod, + get_project_endpoint, copy_template_files, attempt_pod_launch, load_project_config @@ -44,6 +45,16 @@ def test_get_project_pod_not_exists(self, mock_get_pods): result = get_project_pod("1234") self.assertIsNone(result) + @patch("runpod.cli.groups.project.helpers.get_endpoints") + def test_get_project_endpoint_exists(self, mock_get_endpoints): + """Test the get_project_endpoint function when the project endpoint exists.""" + mock_get_endpoints.return_value = [] + assert get_project_endpoint("1234") is None + + mock_get_endpoints.return_value = [{"name": "test-1234", "id": "endpoint_id"}] + result = get_project_endpoint("1234") + self.assertEqual(result, {"name": "test-1234", "id": "endpoint_id"}) + @patch("os.listdir") @patch("os.path.isdir", return_value=False) @patch("shutil.copy2") @@ -64,7 +75,6 @@ def test_copy_template_files_dir(self, mock_copy, mock_isdir, mock_listdir): self.assertEqual(mock_copy.call_count, 2) assert mock_isdir.called - @patch("runpod.cli.groups.project.helpers.create_pod") def test_attempt_pod_launch_success(self, mock_create_pod): """Test the attempt_pod_launch function when it succeeds.""" @@ -89,7 +99,6 @@ def test_attempt_pod_launch_success(self, mock_create_pod): mock_create_pod.side_effect = rp_error.QueryError("error") assert attempt_pod_launch(config, environment_variables) is None - @patch("os.path.exists", return_value=True) @patch("builtins.open", new_callable=mock_open, read_data="[project]\nname='test'") def test_load_project_config(self, mock_file, mock_exists): @@ -99,7 +108,6 @@ def test_load_project_config(self, mock_file, mock_exists): assert mock_exists.called assert mock_file.called - with patch("os.path.exists", return_value=False), \ - self.assertRaises(FileNotFoundError): + self.assertRaises(FileNotFoundError): load_project_config() diff --git a/tests/test_serverless/test_modules/test_job.py b/tests/test_serverless/test_modules/test_job.py index c6897f7c..588bfd76 100644 --- a/tests/test_serverless/test_modules/test_job.py +++ b/tests/test_serverless/test_modules/test_job.py @@ -10,6 +10,7 @@ from runpod.serverless.modules import rp_job + class TestJob(IsolatedAsyncioTestCase): ''' Tests the Job class. ''' @@ -38,19 +39,18 @@ async def test_get_job_200(self): response4.json = make_mocked_coro(return_value={"id": "123", "input": {"number": 1}}) with patch("aiohttp.ClientSession") as mock_session, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): # Set side_effect to a list of mock responses mock_session.get.return_value.__aenter__.side_effect = [ response1, response2, response3, response4 - ] + ] job = await rp_job.get_job(mock_session, retry=True) # Assertions for the success case assert job == {"id": "123", "input": {"number": 1}} - async def test_get_job_204(self): ''' Tests the get_job function with a 204 response @@ -61,7 +61,7 @@ async def test_get_job_204(self): response_204.json = make_mocked_coro(return_value=None) with patch("aiohttp.ClientSession") as mock_session_204, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_204.get.return_value.__aenter__.return_value = response_204 job = await rp_job.get_job(mock_session_204, retry=False) @@ -78,14 +78,13 @@ async def test_get_job_400(self): response_400.status = 400 with patch("aiohttp.ClientSession") as mock_session_400, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_400.get.return_value.__aenter__.return_value = response_400 job = await rp_job.get_job(mock_session_400, retry=False) assert job is None - async def test_get_job_500(self): ''' Tests the get_job function with a 500 response @@ -95,14 +94,13 @@ async def test_get_job_500(self): response_500.status = 500 with patch("aiohttp.ClientSession") as mock_session_500, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_500.get.return_value.__aenter__.return_value = response_500 job = await rp_job.get_job(mock_session_500, retry=False) assert job is None - async def test_get_job_no_id(self): ''' Tests the get_job function with a 200 response but no id @@ -111,10 +109,9 @@ async def test_get_job_no_id(self): response.status = 200 response.json = make_mocked_coro(return_value={}) - with patch("aiohttp.ClientSession") as mock_session, \ - patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session.get.return_value.__aenter__.return_value = response @@ -131,10 +128,9 @@ async def test_get_job_no_input(self): response.status = 200 response.json = make_mocked_coro(return_value={"id": "123"}) - with patch("aiohttp.ClientSession") as mock_session, \ - patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session.get.return_value.__aenter__.return_value = response @@ -152,8 +148,8 @@ async def test_get_job_exception(self): response_exception.status = 200 with patch("aiohttp.ClientSession") as mock_session_exception, \ - patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_exception.get.return_value.__aenter__.side_effect = Exception job = await rp_job.get_job(mock_session_exception, retry=False) @@ -161,6 +157,7 @@ async def test_get_job_exception(self): assert job is None assert mock_log.error.call_count == 1 + class TestRunJob(IsolatedAsyncioTestCase): ''' Tests the run_job function ''' @@ -184,7 +181,7 @@ async def test_simple_job(self): mock_handler.return_value = ['test1', 'test2'] job_result_list = await rp_job.run_job(mock_handler, self.sample_job) - assert job_result_list == {"output":["test1", "test2"]} + assert job_result_list == {"output": ["test1", "test2"]} mock_handler.return_value = 123 job_result_int = await rp_job.run_job(mock_handler, self.sample_job) @@ -249,14 +246,14 @@ async def test_job_with_exception(self): class TestRunJobGenerator(IsolatedAsyncioTestCase): ''' Tests the run_job_generator function ''' - def handler_gen_success(self, job): # pylint: disable=unused-argument + def handler_gen_success(self, job): # pylint: disable=unused-argument ''' Test handler that returns a generator. ''' yield "partial_output_1" yield "partial_output_2" - async def handler_async_gen_success(self, job): # pylint: disable=unused-argument + async def handler_async_gen_success(self, job): # pylint: disable=unused-argument ''' Test handler that returns an async generator. ''' @@ -267,7 +264,7 @@ def handler_fail(self, job): ''' Test handler that raises an exception. ''' - raise Exception("Test Exception") # pylint: disable=broad-exception-raised + raise Exception("Test Exception") # pylint: disable=broad-exception-raised async def test_run_job_generator_success(self): ''' @@ -282,7 +279,7 @@ async def test_run_job_generator_success(self): assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}] assert mock_log.error.call_count == 0 assert mock_log.info.call_count == 1 - mock_log.info.assert_called_with('123 | Finished ') + mock_log.info.assert_called_with('Finished', '123') async def test_run_job_generator_success_async(self): ''' @@ -297,7 +294,7 @@ async def test_run_job_generator_success_async(self): assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}] assert mock_log.error.call_count == 0 assert mock_log.info.call_count == 1 - mock_log.info.assert_called_with('123 | Finished ') + mock_log.info.assert_called_with('Finished', '123') async def test_run_job_generator_exception(self): ''' @@ -313,4 +310,4 @@ async def test_run_job_generator_exception(self): assert "error" in result[0] assert mock_log.error.call_count == 1 assert mock_log.info.call_count == 1 - mock_log.info.assert_called_with('123 | Finished ') + mock_log.info.assert_called_with('Finished', '123')