diff --git a/.github/workflows/CI-pylint.yml b/.github/workflows/CI-pylint.yml index 4e1312a4..ecb73275 100644 --- a/.github/workflows/CI-pylint.yml +++ b/.github/workflows/CI-pylint.yml @@ -5,9 +5,9 @@ on: branches-ignore: - "main-ci" - "release" - pull_request: - branches: - - main + # pull_request: + # branches: + # - main workflow_dispatch: diff --git a/.github/workflows/CI-tests.yml b/.github/workflows/CI-tests.yml index 41c4cab2..3e1edc4b 100644 --- a/.github/workflows/CI-tests.yml +++ b/.github/workflows/CI-tests.yml @@ -6,9 +6,9 @@ on: - main-ci - release - pull_request: - branches: - - main + # pull_request: + # branches: + # - main workflow_dispatch: @@ -38,4 +38,4 @@ jobs: - name: Run Tests run: | - pytest --timeout=120 --timeout_method=thread --cov=runpod --cov-report=xml --cov-report=term-missing --cov-fail-under=100 + pytest --timeout=120 --timeout_method=thread --cov=runpod --cov-report=xml --cov-report=term-missing --cov-fail-under=100 -W error -p no:cacheprovider -p no:unraisableexception diff --git a/requirements.txt b/requirements.txt index 416bfa7a..28a56e59 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ aiohttp >= 3.8.4 +aiohttp-retry >= 2.8.3 backoff == 2.2.1 boto3 >= 1.26.165 diff --git a/runpod/serverless/modules/retry.py b/runpod/serverless/modules/retry.py deleted file mode 100644 index e56f0596..00000000 --- a/runpod/serverless/modules/retry.py +++ /dev/null @@ -1,40 +0,0 @@ -''' -retry helper -''' -import random -import asyncio -from functools import wraps - - -def retry(max_attempts, base_delay, max_delay): - ''' - A decorator to retry async functions in case of exceptions. - - Args: - max_attempts: The maximum number of attempts to retry. - base_delay: The base delay in seconds between retries. - max_delay: The maximum delay in seconds between retries. - - Returns: - A decorated async function with retry capabilities. - ''' - def decorator(func): - @wraps(func) - async def wrapper(*args, **kwargs): - attempt = 1 - while True: - try: - return await func(*args, **kwargs) - except Exception as err: # pylint: disable=broad-except - if attempt >= max_attempts: - raise err - - # Calculate delay using exponential backoff with random jitter - delay = min(base_delay * (2 ** (attempt - 1)), max_delay) - delay *= random.uniform(0.5, 1.5) - - # Wait for the delay before retrying - await asyncio.sleep(delay) - attempt += 1 - return wrapper - return decorator diff --git a/runpod/serverless/modules/rp_http.py b/runpod/serverless/modules/rp_http.py index 9fcc26ec..a4105821 100644 --- a/runpod/serverless/modules/rp_http.py +++ b/runpod/serverless/modules/rp_http.py @@ -4,67 +4,71 @@ import os import json +import aiohttp +from aiohttp_retry import RetryClient, ExponentialRetry from runpod.serverless.modules.rp_logger import RunPodLogger -from .worker_state import Jobs -from .retry import retry -from .worker_state import WORKER_ID +from .worker_state import Jobs, WORKER_ID -JOB_DONE_URL_TEMPLATE = str(os.environ.get('RUNPOD_WEBHOOK_POST_OUTPUT')) -JOB_DONE_URL_TEMPLATE = JOB_DONE_URL_TEMPLATE.replace('$RUNPOD_POD_ID', WORKER_ID) +JOB_DONE_URL_TEMPLATE = str(os.environ.get('RUNPOD_WEBHOOK_POST_OUTPUT', 'JOB_DONE_URL')) +JOB_DONE_URL = JOB_DONE_URL_TEMPLATE.replace('$RUNPOD_POD_ID', WORKER_ID) -JOB_STREAM_URL_TEMPLATE = str(os.environ.get('RUNPOD_WEBHOOK_POST_STREAM')) -JOB_STREAM_URL_TEMPLATE = JOB_STREAM_URL_TEMPLATE.replace('$RUNPOD_POD_ID', WORKER_ID) +JOB_STREAM_URL_TEMPLATE = str(os.environ.get('RUNPOD_WEBHOOK_POST_STREAM', 'JOB_STREAM_URL')) +JOB_STREAM_URL = JOB_STREAM_URL_TEMPLATE.replace('$RUNPOD_POD_ID', WORKER_ID) log = RunPodLogger() job_list = Jobs() -@retry(max_attempts=3, base_delay=1, max_delay=3) -async def transmit(session, job_data, url): +async def _transmit(client_session, url, job_data ): """ - Wrapper for sending results. + Wrapper for transmitting results via POST. """ - headers = { - "charset": "utf-8", - "Content-Type": "application/x-www-form-urlencoded" - } + retry_options = ExponentialRetry(attempts=3) + retry_client = RetryClient(client_session=client_session, retry_options=retry_options) - async with session.post(url, - data=job_data, - headers=headers, - raise_for_status=True) as resp: - await resp.text() + kwargs = { + "data": job_data, + "headers": {"charset": "utf-8", "Content-Type": "application/x-www-form-urlencoded"}, + "raise_for_status": True + } + async with retry_client.post(url, **kwargs) as client_response: + await client_response.text() -async def send_result(session, job_data, job): - ''' - Return the job results. - ''' + +async def _handle_result(session, job_data, job, url_template, log_message): + """ + A helper function to handle the result, either for sending or streaming. + """ try: - job_data = json.dumps(job_data, ensure_ascii=False) - job_done_url = JOB_DONE_URL_TEMPLATE.replace('$ID', job['id']) + serialized_job_data = json.dumps(job_data, ensure_ascii=False) + url = url_template.replace('$ID', job['id']) - await transmit(session, job_data, job_done_url) - log.debug(f"{job['id']} | Results sent.") + await _transmit(session, url, serialized_job_data) + log.debug(f"{job['id']} | {log_message}") - job_list.remove_job(job["id"]) - log.info(f'{job["id"]} | Finished') + except aiohttp.ClientResponseError as err: + log.error(f"{job['id']} | Client response error while transmitting job. | {err}") - except Exception as err: # pylint: disable=broad-except + except (TypeError, RuntimeError) as err: log.error(f"Error while returning job result {job['id']}: {err}") + finally: + if url_template == JOB_DONE_URL: + job_list.remove_job(job["id"]) + log.info(f'{job["id"]} | Finished') -async def stream_result(session, job_data, job): - ''' - Return the stream job results. - ''' - try: - job_data = json.dumps(job_data, ensure_ascii=False) - job_done_url = JOB_STREAM_URL_TEMPLATE.replace('$ID', job['id']) - await transmit(session, job_data, job_done_url) - log.debug(f"{job['id']} | Intermediate Results sent.") +async def send_result(session, job_data, job): + """ + Return the job results. + """ + await _handle_result(session, job_data, job, JOB_DONE_URL, "Results sent.") + - except Exception as err: # pylint: disable=broad-except - log.error(f"Error while returning job result {job['id']}: {err}") +async def stream_result(session, job_data, job): + """ + Return the stream job results. + """ + await _handle_result(session, job_data, job, JOB_STREAM_URL, "Intermediate results sent.") diff --git a/setup.cfg b/setup.cfg index eda01308..899d55fe 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ packages = find: python_requires = >= 3.10 install_requires = aiohttp >= 3.8.4 + aiohttp-retry >= 2.8.3 backoff >= 2.2.1 boto3 >= 1.26.165 click >= 8.1.7 diff --git a/tests/test_endpoint/test_asyncio_runner.py b/tests/test_endpoint/test_asyncio_runner.py index 4c4316de..5a18e552 100644 --- a/tests/test_endpoint/test_asyncio_runner.py +++ b/tests/test_endpoint/test_asyncio_runner.py @@ -1,13 +1,11 @@ ''' Unit tests for the asyncio_runner module. ''' # pylint: disable=too-few-public-methods - import tracemalloc import asyncio import unittest from unittest.mock import patch, MagicMock from unittest import IsolatedAsyncioTestCase -import pytest from runpod.endpoint.asyncio.asyncio_runner import Job, Endpoint @@ -17,7 +15,6 @@ class TestJob(IsolatedAsyncioTestCase): ''' Tests the Job class. ''' - @pytest.mark.asyncio async def test_status(self): ''' Tests Job.status @@ -32,7 +29,6 @@ async def test_status(self): status = await job.status() assert status == "COMPLETED" - @pytest.mark.asyncio async def test_output(self): ''' Tests Job.output @@ -57,7 +53,6 @@ async def json_side_effect(): output = await output_task assert output == "OUTPUT" - @pytest.mark.asyncio async def test_cancel(self): ''' Tests Job.cancel @@ -72,10 +67,35 @@ async def test_cancel(self): cancel_result = await job.cancel() assert cancel_result == {"result": "CANCELLED"} -class TestEndpoint: + async def test_output_in_progress_then_completed(self): + '''Tests Job.output when status is initially IN_PROGRESS and then changes to COMPLETED''' + with ( + patch("runpod.endpoint.asyncio.asyncio_runner.asyncio.sleep") as mock_sleep, + patch("aiohttp.ClientSession") as mock_session + ): + mock_resp = MagicMock() + responses = [ + {"status": "IN_PROGRESS"}, + {"status": "COMPLETED"}, + {"output": "OUTPUT"} + ] + + async def json_side_effect(): + if responses: + return responses.pop(0) + return {"status": "IN_PROGRESS"} + + mock_resp.json = json_side_effect + mock_session.get.return_value.__aenter__.return_value = mock_resp + + job = Job("endpoint_id", "job_id", mock_session) + output = await job.output() + assert output == "OUTPUT" + mock_sleep.assert_called_once_with(1) + +class TestEndpoint(IsolatedAsyncioTestCase): ''' Unit tests for the Endpoint class. ''' - @pytest.mark.asyncio async def test_run(self): ''' Tests Endpoint.run @@ -90,6 +110,17 @@ async def test_run(self): job = await endpoint.run({"input": "INPUT"}) assert job.job_id == "job_id" +class TestEndpointInitialization(unittest.TestCase): + '''Tests for the Endpoint class initialization.''' + + def test_endpoint_initialization(self): + '''Tests initialization of Endpoint class.''' + with patch("aiohttp.ClientSession"): + endpoint = Endpoint("endpoint_id", MagicMock()) + self.assertEqual(endpoint.endpoint_url, "https://api.runpod.ai/v2/endpoint_id/run") + self.assertIn("Content-Type", endpoint.headers) + self.assertIn("Authorization", endpoint.headers) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_serverless/test_modules/test_http.py b/tests/test_serverless/test_modules/test_http.py index 02f97c8c..510dc447 100644 --- a/tests/test_serverless/test_modules/test_http.py +++ b/tests/test_serverless/test_modules/test_http.py @@ -1,17 +1,29 @@ ''' Test rp_http.py module. ''' +# pylint: disable=too-few-public-methods -import asyncio +import gc +import json import unittest -from unittest.mock import patch, Mock, AsyncMock +from unittest.mock import patch, AsyncMock import aiohttp -from aiohttp import ClientResponse - -import pytest from runpod.serverless.modules import rp_http +class MockRequestInfo: + ''' Mock aiohttp.RequestInfo class. ''' + + def __init__(self, *args, **kwargs): + del args, kwargs + self.url = "http://test_url" + self.method = "POST" + self.headers = {"Content-Type": "application/json"} + self.real_url = "http://test_url" + + real_url = "http://test_url" + + class TestHTTP(unittest.IsolatedAsyncioTestCase): ''' Test HTTP module. ''' @@ -19,119 +31,126 @@ def setUp(self) -> None: self.job = {"id": "test_id"} self.job_data = {"output": "test_output"} - def test_send_result_exception(self): + def tearDown(self) -> None: + gc.collect() + + + async def test_send_result(self): ''' Test send_result function. ''' - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - with patch('runpod.serverless.modules.rp_http.log') as mock_log: - with patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs: - mock_jobs.return_value = set(['test_id']) - send_return_local = asyncio.run( - rp_http.send_result(Mock(), self.job_data, self.job)) + with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: + + mock_retry.return_value.post.return_value = AsyncMock() + mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long - assert send_return_local is None - assert mock_log.debug.call_count == 0 - assert mock_log.error.call_count == 1 + mock_jobs.return_value = set(['test_id']) + send_return_local = await rp_http.send_result(AsyncMock(), self.job_data, self.job) - loop.close() + assert send_return_local is None + assert mock_log.debug.call_count == 1 + assert mock_log.error.call_count == 0 + assert mock_log.info.call_count == 1 - def test_send_result(self): + mock_retry.return_value.post.assert_called_with( + 'JOB_DONE_URL', + data=str(json.dumps(self.job_data, ensure_ascii=False)), + headers={ + "charset": "utf-8", + "Content-Type": "application/x-www-form-urlencoded" + }, + raise_for_status=True + ) + + + async def test_send_result_client_response_error(self): ''' - Test send_result function. + Test send_result function with ClientResponseError. + ''' + def mock_request_info_init(self, *args, **kwargs): + ''' + Mock aiohttp.RequestInfo.__init__ method. + ''' + del args, kwargs + self.url = "http://test_url" + self.method = "POST" + self.headers = {"Content-Type": "application/json"} + self.real_url = "http://test_url" + + with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry, \ + patch.object(aiohttp.RequestInfo, "__init__", mock_request_info_init): + + mock_retry.side_effect = aiohttp.ClientResponseError( + request_info=MockRequestInfo, + history=None, + status=500, + message="Error message" + ) + + mock_jobs.return_value = set(['test_id']) + send_return_local = await rp_http.send_result(AsyncMock(), self.job_data, self.job) + + assert send_return_local is None + assert mock_log.debug.call_count == 0 + assert mock_log.error.call_count == 1 + assert mock_log.info.call_count == 1 + + + async def test_send_result_type_error(self): ''' - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - with patch('runpod.serverless.modules.rp_http.log') as mock_log,\ - patch('runpod.serverless.modules.rp_http.transmit', new=AsyncMock()) as mock_transmit: - with patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs: - mock_jobs.return_value = set(['test_id']) + Test send_result function with TypeError. + ''' + with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.json.dumps') as mock_dumps, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: + + mock_dumps.side_effect = TypeError("Forced exception") - send_return_local = asyncio.run( - rp_http.send_result(Mock(), self.job_data, self.job)) + mock_jobs.return_value = set(['test_id']) + send_return_local = await rp_http.send_result("No Session", self.job_data, self.job) - assert send_return_local is None - assert mock_log.debug.call_count == 1 - assert mock_log.error.call_count == 0 - assert mock_log.info.call_count == 1 - mock_transmit.assert_called_once() + assert send_return_local is None + assert mock_log.debug.call_count == 0 + assert mock_log.error.call_count == 1 + assert mock_log.info.call_count == 1 + assert mock_retry.return_value.post.call_count == 0 + mock_log.error.assert_called_with("Error while returning job result test_id: Forced exception") # pylint: disable=line-too-long - loop.close() async def test_stream_result(self): ''' Test stream_result function. ''' - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - with patch('runpod.serverless.modules.rp_http.log') as mock_log,\ - patch('runpod.serverless.modules.rp_http.transmit', new=AsyncMock()) as mock_transmit: - with patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs: - mock_jobs.return_value = set(['test_id']) - rp_http.IS_LOCAL_TEST = True - send_return_local = asyncio.run( - rp_http.stream_result(Mock(), self.job_data, self.job)) - - assert send_return_local is None - assert mock_log.debug.call_count == 1 - assert mock_log.error.call_count == 0 - assert mock_log.info.call_count == 0 - mock_transmit.assert_called_once() - - loop.close() - - def test_stream_result_exception(self): - ''' - Test stream_result function exception. - ''' - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - with patch('runpod.serverless.modules.rp_http.log') as mock_log: - with patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs: - mock_jobs.return_value = set(['test_id']) - send_return_local = asyncio.run( - rp_http.stream_result(Mock(), self.job_data, self.job)) - - assert send_return_local is None - assert mock_log.debug.call_count == 0 - assert mock_log.error.call_count == 1 - - loop.close() - -@pytest.mark.asyncio -@patch('aiohttp.ClientSession.post') -async def test_transmit(mock_post): - ''' - Tests the transmit function - ''' - # Mock the session and job data - session = Mock() - job_data = {"output": "test_output"} - url = "http://example.com" - - # Mock the response from the post request - mock_response = AsyncMock(spec=ClientResponse) - mock_response.text.return_value = "response text" - - # Mock context manager returned by post - async_context_manager = AsyncMock() - async_context_manager.__aenter__.return_value = mock_response - - # Mock post method on session - mock_post.return_value = async_context_manager - - # Mock session - session = aiohttp.ClientSession() - - # Call the function - await rp_http.transmit(session, job_data, url) - - # Check that post was called with the correct arguments - mock_post.assert_called_once_with(url, data=job_data, headers={ - "charset": "utf-8", - "Content-Type": "application/x-www-form-urlencoded" - }, raise_for_status=True) - - # Check that text() method was called on the response - mock_response.text.assert_called_once() + with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: + + mock_retry.return_value.post.return_value = AsyncMock() + mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long + + mock_jobs.return_value = set(['test_id']) + send_return_local = await rp_http.stream_result(AsyncMock(), self.job_data, self.job) + + assert send_return_local is None + assert mock_log.debug.call_count == 1 + assert mock_log.error.call_count == 0 + assert mock_log.info.call_count == 0 + + mock_retry.return_value.post.assert_called_with( + 'JOB_STREAM_URL', + data=str(json.dumps(self.job_data, ensure_ascii=False)), + headers={ + "charset": "utf-8", + "Content-Type": "application/x-www-form-urlencoded" + }, + raise_for_status=True + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_serverless/test_modules/test_job.py b/tests/test_serverless/test_modules/test_job.py index 158ff09b..d12e3753 100644 --- a/tests/test_serverless/test_modules/test_job.py +++ b/tests/test_serverless/test_modules/test_job.py @@ -5,7 +5,6 @@ from unittest.mock import Mock, patch from unittest import IsolatedAsyncioTestCase -import pytest from aiohttp import ClientResponse from aiohttp.test_utils import make_mocked_coro @@ -14,7 +13,6 @@ class TestJob(IsolatedAsyncioTestCase): ''' Tests the Job class. ''' - @pytest.mark.asyncio async def test_get_job_200(self): ''' Tests the get_job function @@ -46,7 +44,6 @@ async def test_get_job_200(self): assert job == {"id": "123", "input": {"number": 1}} - @pytest.mark.asyncio async def test_get_job_204(self): ''' Tests the get_job function with a 204 response @@ -65,7 +62,6 @@ async def test_get_job_204(self): assert job is None assert mock_session_204.get.call_count == 1 - @pytest.mark.asyncio async def test_get_job_500(self): ''' Tests the get_job function with a 500 response @@ -83,7 +79,6 @@ async def test_get_job_500(self): assert job is None - @pytest.mark.asyncio async def test_get_job_no_id(self): ''' Tests the get_job function with a 200 response but no id @@ -104,7 +99,6 @@ async def test_get_job_no_id(self): assert job is None assert mock_log.error.call_count == 1 - @pytest.mark.asyncio async def test_get_job_no_input(self): ''' Tests the get_job function with a 200 response but no input @@ -125,7 +119,6 @@ async def test_get_job_no_input(self): assert job is None assert mock_log.error.call_count == 1 - @pytest.mark.asyncio async def test_get_job_exception(self): ''' Tests the get_job function with an exception diff --git a/tests/test_serverless/test_modules/test_retry.py b/tests/test_serverless/test_modules/test_retry.py deleted file mode 100644 index e6f81028..00000000 --- a/tests/test_serverless/test_modules/test_retry.py +++ /dev/null @@ -1,42 +0,0 @@ -''' Unit tests for the retry module. ''' - -from unittest.mock import patch -import pytest -from runpod.serverless.modules.retry import retry - -@retry(max_attempts=3, base_delay=1, max_delay=10) -async def func_raises_exception(): - ''' - A test function to be decorated with the retry decorator. - ''' - raise Exception("Test Exception") # pylint: disable=broad-exception-raised - -@retry(max_attempts=3, base_delay=1, max_delay=10) -async def func_returns_success(): - ''' - A test function to be decorated with the retry decorator. - ''' - return "Success" - -@pytest.mark.asyncio -async def test_retry(): - ''' - Test the retry decorator. - ''' - with patch("asyncio.sleep", return_value=None) as mock_sleep: - with pytest.raises(Exception) as excinfo: - await func_raises_exception() - assert str(excinfo.value) == "Test Exception" - assert mock_sleep.call_count == 2 - assert 0.5 <= mock_sleep.call_args_list[0][0][0] <= 1.5 # First call - assert 1.0 <= mock_sleep.call_args_list[1][0][0] - -@pytest.mark.asyncio -async def test_retry_success(): - ''' - Test the retry decorator. - ''' - with patch("asyncio.sleep", return_value=None) as mock_sleep: - result = await func_returns_success() - assert result == "Success" - assert mock_sleep.call_count == 0 diff --git a/tests/test_serverless/test_utils/test_upload.py b/tests/test_serverless/test_utils/test_upload.py index 8ce558db..e36f67e6 100644 --- a/tests/test_serverless/test_utils/test_upload.py +++ b/tests/test_serverless/test_utils/test_upload.py @@ -22,9 +22,13 @@ class TestBotoConfig(unittest.TestCase): ''' Tests for boto config ''' def setUp(self) -> None: + self.original_environ = os.environ.copy() self.mock_transfer_config = MagicMock() self.mock_boto_client = MagicMock() + def tearDown(self): + os.environ = self.original_environ + def test_get_boto_client(self): ''' Tests get_boto_client diff --git a/tests/test_serverless/test_worker.py b/tests/test_serverless/test_worker.py index b94fdf8f..31f0b9f3 100644 --- a/tests/test_serverless/test_worker.py +++ b/tests/test_serverless/test_worker.py @@ -5,7 +5,6 @@ from unittest.mock import patch, mock_open, Mock, MagicMock from unittest import IsolatedAsyncioTestCase -import pytest import nest_asyncio import runpod @@ -162,7 +161,6 @@ def setUp(self): } } - @pytest.mark.asyncio @patch("aiohttp.ClientSession") @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @@ -196,7 +194,6 @@ async def test_run_worker( assert mock_stream_result.called is False assert mock_session.called - @pytest.mark.asyncio @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @patch("runpod.serverless.worker.stream_result") @@ -229,7 +226,6 @@ async def test_run_worker_generator_handler( _, args, _ = mock_send_result.mock_calls[0] assert args[1] == {'output': [], 'stopPod': True} - @pytest.mark.asyncio @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @patch("runpod.serverless.worker.stream_result") @@ -262,7 +258,6 @@ async def test_run_worker_generator_handler_exception( _, args, _ = mock_send_result.mock_calls[0] assert 'error' in args[1] - @pytest.mark.asyncio @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @patch("runpod.serverless.worker.stream_result") @@ -297,7 +292,6 @@ async def test_run_worker_generator_aggregate_handler( _, args, _ = mock_send_result.mock_calls[0] assert args[1] == {'output': ['test1', 'test2'], 'stopPod': True} - @pytest.mark.asyncio @patch("aiohttp.ClientSession") @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @@ -370,7 +364,6 @@ def concurrency_controller(): assert mock_set_config_args.called - @pytest.mark.asyncio @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @patch("runpod.serverless.worker.send_result") @@ -430,7 +423,6 @@ def mock_is_alive(): assert mock_run_job.call_count == 46 assert mock_send_result.call_count == 46 - @pytest.mark.asyncio @patch("runpod.serverless.modules.rp_scale.get_job") @patch("runpod.serverless.worker.run_job") @patch("runpod.serverless.worker.send_result")