diff --git a/.github/tests.json b/.github/tests.json index 78c2ef25..384dec13 100644 --- a/.github/tests.json +++ b/.github/tests.json @@ -2,7 +2,7 @@ { "hardwareConfig": { "endpointConfig": { - "gpuIds": "AMPERE_16", + "gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80", "name": "runpod-python E2E Test - Basic" } }, @@ -13,7 +13,7 @@ { "hardwareConfig": { "endpointConfig": { - "gpuIds": "AMPERE_16", + "gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80", "name": "runpod-python E2E Test - Long Job" } }, @@ -25,7 +25,7 @@ { "hardwareConfig": { "endpointConfig": { - "gpuIds": "AMPERE_16", + "gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80", "name": "runpod-python E2E Test - Generator Handler" }, "templateConfig": { @@ -43,7 +43,7 @@ { "hardwareConfig": { "endpointConfig": { - "gpuIds": "AMPERE_16", + "gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80", "name": "runpod-python E2E Test - Async Generator Handler" }, "templateConfig": { diff --git a/.github/workflows/CI-e2e.yml b/.github/workflows/CI-e2e.yml index 3c813bfd..6f1062dd 100644 --- a/.github/workflows/CI-e2e.yml +++ b/.github/workflows/CI-e2e.yml @@ -63,9 +63,22 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Run Tests + id: run-tests uses: direlines/runpod-test-runner@v1.7 with: image-tag: ${{ vars.DOCKERHUB_REPO }}/${{ vars.DOCKERHUB_IMG }}:${{ needs.e2e-build.outputs.docker_tag }} runpod-api-key: ${{ secrets.RUNPOD_API_KEY }} - request-timeout: 600 + request-timeout: 1200 + + - name: Verify Tests + env: + TOTAL_TESTS: ${{ steps.run-tests.outputs.total-tests }} + SUCCESSFUL_TESTS: ${{ steps.run-tests.outputs.succeeded }} + run: | + echo "Total tests: $TOTAL_TESTS" + echo "Successful tests: $SUCCESSFUL_TESTS" + if [ "$TOTAL_TESTS" != "$SUCCESSFUL_TESTS" ]; then + exit 1 + fi diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f6b9da6..88c9b11c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,53 @@ # Change Log -## Release 1.2.1 (89/22/23) +## Release 1.2.6 (10/6/23) + +### Changes + +- Force `urllib3` logging to `WARNING` level to avoid spamming the console if global logging level is set to `DEBUG`. + +--- + +## Release 1.2.5 (10/5/23) + +### Fixed + +- Handler called twice. +- Default container disk size removed if template is provided when creating a new pod. + +--- + +## ~~Release (Patch) 1.2.3 (10/4/23)~~ Replaced by 1.2.5 + +### Bug Fix + +- Job outputs that were not dictionaries, bool, or str were swallowed by the serverless worker. This has been fixed. + +--- + +## ~~Release 1.2.2 (10/4/23)~~ Replaced by 1.2.5 + +### Added + +- User queries and mutations are now available in the python API wrapper. +- `start_ssh` added with default `True` when creating new pods. +- `network_volume_id` can now be passed in when creating new pods, correct data center is automatically selected. +- `template_id` can now be passed in when creating new pods. + +### Changes + +- Dependencies updated to latest versions. +- Reduced circular imports for version reference. +- `support_public_ip` is not default to `True` when creating new pods. + +### Fixed + +- Reduce pool_connections for ping requests to 10. +- Double timeout for ping requests. + +--- + +## Release 1.2.1 (9/22/23) ### Added @@ -13,6 +60,8 @@ - Region is included when using S3 storage via rp_upload, automatically filled in for Amazon S3 buckets and Digital Ocean Spaces. +--- + ## Release 1.2.0 (8/29/23) ### Added diff --git a/requirements.txt b/requirements.txt index 0440596f..9b4220cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,9 +13,10 @@ python-dotenv >= 1.0.0 requests >= 2.31.0 tomli >= 2.0.1 tqdm-loggable == 0.1.4 -setuptools_scm == 8.0.3 +setuptools_scm == 8.0.4 watchdog >= 3.0.0 + fastapi[all] == 0.103.2 # Minimum versions for dependencies diff --git a/runpod/__init__.py b/runpod/__init__.py index ba75d4ce..5668c4a6 100644 --- a/runpod/__init__.py +++ b/runpod/__init__.py @@ -1,11 +1,12 @@ """ Allows runpod to be imported as a module. """ import os +import logging -from .version import __version__ from . import serverless from .endpoint import Endpoint from .endpoint import AsyncioEndpoint, AsyncioJob +from .version import __version__ from .api.ctl_commands import( get_user, update_user_settings, get_gpus, get_gpu, @@ -30,3 +31,8 @@ api_url_base = "https://api.runpod.io" # pylint: disable=invalid-name endpoint_url_base = "https://api.runpod.ai/v2" # pylint: disable=invalid-name + + +# --------------------------- Force Logging Levels --------------------------- # +logging.getLogger("urllib3").setLevel(logging.WARNING) + diff --git a/runpod/api/ctl_commands.py b/runpod/api/ctl_commands.py index ffc762b4..b7a17235 100644 --- a/runpod/api/ctl_commands.py +++ b/runpod/api/ctl_commands.py @@ -78,9 +78,9 @@ def create_pod( cloud_type:str="ALL", support_public_ip:bool=True, start_ssh:bool=True, data_center_id : Optional[str]=None, country_code:Optional[str]=None, - gpu_count:int=1, volume_in_gb:int=0, container_disk_in_gb:int=5, + gpu_count:int=1, volume_in_gb:int=0, container_disk_in_gb:Optional[int]=None, min_vcpu_count:int=1, min_memory_in_gb:int=1, docker_args:str="", - ports:Optional[str]=None, volume_mount_path:str="/runpod_volume", + ports:Optional[str]=None, volume_mount_path:str="/runpod-volume", env:Optional[dict]=None, template_id:Optional[str]=None, network_volume_id:Optional[str]=None ) -> dict: @@ -118,6 +118,9 @@ def create_pod( data_center_id = network_volume["dataCenterId"] break + if container_disk_in_gb is None and template_id is None: + container_disk_in_gb = 10 + raw_response = run_graphql_query( pod_mutations.generate_pod_deployment_mutation( name, image_name, gpu_type_id, diff --git a/runpod/serverless/modules/rp_fastapi.py b/runpod/serverless/modules/rp_fastapi.py index f618cb30..7d358704 100644 --- a/runpod/serverless/modules/rp_fastapi.py +++ b/runpod/serverless/modules/rp_fastapi.py @@ -11,7 +11,8 @@ from .rp_job import run_job from .worker_state import Jobs from .rp_ping import Heartbeat -from ...version import __version__ as runpod_Version +from ...version import __version__ as runpod_version + RUNPOD_ENDPOINT_ID = os.environ.get("RUNPOD_ENDPOINT_ID", None) @@ -68,7 +69,7 @@ def __init__(self, handler=None): self.rp_app = FastAPI( title="RunPod | Test Worker | API", description=DESCRIPTION, - version=runpod_Version + version=runpod_version, ) # Create an APIRouter and add the route for processing jobs. diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index ef233e31..411d6874 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -14,6 +14,7 @@ from runpod.serverless.modules.rp_logger import RunPodLogger from .worker_state import WORKER_ID, Jobs from .rp_tips import check_return_size +from ...version import __version__ as runpod_version JOB_GET_URL = str(os.environ.get('RUNPOD_WEBHOOK_GET_JOB')).replace('$ID', WORKER_ID) @@ -108,51 +109,54 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: Returns the job output or error. """ log.info(f'{job["id"]} | Started') + run_result = {"error": "No output from handler."} try: - result = handler(job) - job_output = await result if inspect.isawaitable(result) else result + handler_return = handler(job) + job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return log.debug(f'{job["id"]} | Handler output: {job_output}') - run_result = {"output": job_output} - if isinstance(job_output, dict): + error_msg = job_output.pop("error", None) + refresh_worker = job_output.pop("refresh_worker", None) - if job_output.get("error", False): - run_result["error"] = str(run_result["output"].pop("error")) + run_result = {"output": job_output} - if job_output.get("refresh_worker", False): + if error_msg: + run_result["error"] = error_msg + if refresh_worker: run_result["stopPod"] = True - run_result["output"].pop("refresh_worker") - - if run_result["output"] == {}: - run_result.pop("output") elif isinstance(job_output, bool): run_result = {"output": job_output} + else: + run_result = {"output": job_output} + + if run_result.get("output") == {}: + run_result.pop("output") + check_return_size(run_result) # Checks the size of the return body. + except Exception as err: # pylint: disable=broad-except - from runpod import __version__ as runpod_version # pylint: disable=import-outside-toplevel,cyclic-import - error_content = json.dumps( - { - "error_type": str(type(err)), - "error_message": str(err), - "error_traceback": traceback.format_exc(), - "hostname": os.environ.get("RUNPOD_POD_HOSTNAME", "unknown"), - "worker_id": os.environ.get("RUNPOD_POD_ID", "unknown"), - "runpod_version": runpod_version - }, indent=4) + error_info = { + "error_type": str(type(err)), + "error_message": str(err), + "error_traceback": traceback.format_exc(), + "hostname": os.environ.get("RUNPOD_POD_HOSTNAME", "unknown"), + "worker_id": os.environ.get("RUNPOD_POD_ID", "unknown"), + "runpod_version": runpod_version + } log.error(f'{job["id"]} | Captured Handler Exception') - log.error(error_content) + log.error(json.dumps(error_info, indent=4)) + run_result = {"error": json.dumps(error_info)} - run_result = {"error": error_content} finally: log.debug(f'{job["id"]} | run_job return: {run_result}') - return run_result # pylint: disable=lost-exception + return run_result async def run_job_generator( diff --git a/runpod/serverless/modules/rp_ping.py b/runpod/serverless/modules/rp_ping.py index 986a8e32..946b77dd 100644 --- a/runpod/serverless/modules/rp_ping.py +++ b/runpod/serverless/modules/rp_ping.py @@ -11,6 +11,7 @@ from runpod.serverless.modules.rp_logger import RunPodLogger from .worker_state import Jobs, WORKER_ID +from ...version import __version__ as runpod_version log = RunPodLogger() jobs = Jobs() # Contains the list of jobs that are currently running. @@ -47,8 +48,6 @@ def __init__(self, pool_connections=10, retries=3) -> None: self._session.mount('http://', adapter) self._session.mount('https://', adapter) - self.runpod_version = None - def start_ping(self, test=False): ''' Sends heartbeat pings to the Runpod server. @@ -58,9 +57,6 @@ def start_ping(self, test=False): return if not Heartbeat._thread_started: - from runpod import __version__ as runpod_version # pylint: disable=import-outside-toplevel,cyclic-import - self.runpod_version = runpod_version - threading.Thread(target=self.ping_loop, daemon=True, args=(test,)).start() Heartbeat._thread_started = True @@ -82,7 +78,7 @@ def _send_ping(self): job_ids = jobs.get_job_list() ping_params = { 'job_id': job_ids, - 'runpod_version': self.runpod_version + 'runpod_version': runpod_version } try: diff --git a/runpod/serverless/utils/rp_upload.py b/runpod/serverless/utils/rp_upload.py index 1d9b82ec..7c5d9dfc 100644 --- a/runpod/serverless/utils/rp_upload.py +++ b/runpod/serverless/utils/rp_upload.py @@ -14,7 +14,7 @@ from typing import Optional, Tuple import boto3 -from PIL import Image +from PIL import Image, UnidentifiedImageError from boto3 import session from boto3.s3.transfer import TransferConfig from botocore.config import Config @@ -96,10 +96,11 @@ def get_boto_client( # ---------------------------------------------------------------------------- # def upload_image(job_id, image_location, result_index=0, results_list=None): # pragma: no cover ''' - Upload image to bucket storage. + Upload a single file to bucket storage. ''' image_name = str(uuid.uuid4())[:8] boto_client, _ = get_boto_client() + file_extension = os.path.splitext(image_location)[1] if boto_client is None: # Save the output to a file @@ -108,39 +109,53 @@ def upload_image(job_id, image_location, result_index=0, results_list=None): # p print("https://github.com/runpod/runpod-python/blob/main/docs/serverless/worker-utils.md") os.makedirs("simulated_uploaded", exist_ok=True) - sim_upload_location = f"simulated_uploaded/{image_name}.png" - with Image.open(image_location) as img, open(sim_upload_location, "wb") as file_output: - img.save(file_output, format=img.format) + sim_upload_location = f"simulated_uploaded/{image_name}{file_extension}" + try: + with Image.open(image_location) as img, open(sim_upload_location, "wb") as file_output: + img.save(file_output, format=img.format) + + except UnidentifiedImageError: + # If the file is not an image, save it directly + shutil.copy(image_location, sim_upload_location) if results_list is not None: results_list[result_index] = sim_upload_location return sim_upload_location - with Image.open(image_location) as img: - output = BytesIO() - img.save(output, format=img.format) - output.seek(0) - - bucket = time.strftime('%m-%y') - boto_client.put_object( - Bucket=f'{bucket}', - Key=f'{job_id}/{image_name}.png', - Body=output.getvalue(), - ContentType="image/png" - ) + try: + with Image.open(image_location) as img: + output = BytesIO() + img.save(output, format=img.format) + output.seek(0) + content_type = "image/" + file_extension.lstrip(".") + + except UnidentifiedImageError: + # If the file is not an image, read it directly + with open(image_location, "rb") as f: + output = f.read() + content_type = "application/octet-stream" + + + bucket = time.strftime('%m-%y') + boto_client.put_object( + Bucket=f'{bucket}', + Key=f'{job_id}/{image_name}{file_extension}', + Body=output, + ContentType=content_type + ) - presigned_url = boto_client.generate_presigned_url( - 'get_object', - Params={ - 'Bucket': f'{bucket}', - 'Key': f'{job_id}/{image_name}.png' - }, ExpiresIn=604800) + presigned_url = boto_client.generate_presigned_url( + 'get_object', + Params={ + 'Bucket': f'{bucket}', + 'Key': f'{job_id}/{image_name}{file_extension}' + }, ExpiresIn=604800) - if results_list is not None: - results_list[result_index] = presigned_url + if results_list is not None: + results_list[result_index] = presigned_url - return presigned_url + return presigned_url # ---------------------------------------------------------------------------- # diff --git a/setup.cfg b/setup.cfg index 6fc38519..44711a07 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,8 +43,8 @@ install_requires = requests >= 2.31.0 tomli >= 2.0.1 tqdm-loggable >= 0.1.4 + fastapi[all] >= 0.103.2 watchdog >= 3.0.0 - fastapi[all] >= 0.99.0 # Minimum versions for dependencies urllib3 >= 1.26.6 diff --git a/tests/test_api/test_ctl_commands.py b/tests/test_api/test_ctl_commands.py index d83b27e7..39e71fe3 100644 --- a/tests/test_api/test_ctl_commands.py +++ b/tests/test_api/test_ctl_commands.py @@ -104,7 +104,7 @@ def test_create_pod(self): Tests create_pod ''' with patch("runpod.api.graphql.requests.post") as patch_request, \ - patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu,\ + patch("runpod.api.ctl_commands.get_gpu") as patch_get_gpu, \ patch("runpod.api.ctl_commands.get_user") as patch_get_user: patch_request.return_value.json.return_value = { @@ -116,6 +116,7 @@ def test_create_pod(self): } patch_get_gpu.return_value = None + patch_get_user.return_value = { "networkVolumes": [ { @@ -128,6 +129,8 @@ def test_create_pod(self): pod = ctl_commands.create_pod( name="POD_NAME", image_name="IMAGE_NAME", + support_public_ip=False, + gpu_type_id="NVIDIA A100 80GB PCIe", network_volume_id="NETWORK_VOLUME_ID") @@ -136,9 +139,10 @@ def test_create_pod(self): with self.assertRaises(ValueError) as context: pod = ctl_commands.create_pod( name="POD_NAME", + cloud_type="NOT_A_CLOUD_TYPE", image_name="IMAGE_NAME", gpu_type_id="NVIDIA A100 80GB PCIe", - cloud_type="NOT A CLOUD TYPE") + network_volume_id="NETWORK_VOLUME_ID") self.assertEqual(str(context.exception), "cloud_type must be one of ALL, COMMUNITY or SECURE") diff --git a/tests/test_serverless/test_modules/test_job.py b/tests/test_serverless/test_modules/test_job.py index 1eaa0611..c6897f7c 100644 --- a/tests/test_serverless/test_modules/test_job.py +++ b/tests/test_serverless/test_modules/test_job.py @@ -177,12 +177,19 @@ async def test_simple_job(self): Tests the run_job function ''' mock_handler = Mock() - mock_handler.return_value = "test" + mock_handler.return_value = "test" job_result = await rp_job.run_job(mock_handler, self.sample_job) - assert job_result == {"output": "test"} + mock_handler.return_value = ['test1', 'test2'] + job_result_list = await rp_job.run_job(mock_handler, self.sample_job) + assert job_result_list == {"output":["test1", "test2"]} + + mock_handler.return_value = 123 + job_result_int = await rp_job.run_job(mock_handler, self.sample_job) + assert job_result_int == {"output": 123} + async def test_job_with_errors(self): ''' Tests the run_job function with errors @@ -194,6 +201,17 @@ async def test_job_with_errors(self): assert job_result == {"error": "test"} + async def test_job_with_raised_exception(self): + ''' + Tests the run_job function with a raised exception + ''' + mock_handler = Mock() + mock_handler.side_effect = Exception + + job_result = await rp_job.run_job(mock_handler, self.sample_job) + + assert "error" in job_result + async def test_job_with_refresh_worker(self): ''' Tests the run_job function with refresh_worker