diff --git a/CHANGELOG.md b/CHANGELOG.md index ef7c9084..222e8472 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ - Robust `get_job` error handling - `project.toml` now includes required dependencies +--- + ## Release 1.3.4 (11/14/23) ### Changed diff --git a/requirements.txt b/requirements.txt index d1ee4ce4..a9fca489 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aiohttp >= 3.8.6 +aiohttp >= 3.8.6, < 3.9.0 aiohttp-retry >= 2.8.3 backoff >= 2.2.1 diff --git a/runpod/serverless/modules/rp_http.py b/runpod/serverless/modules/rp_http.py index 213fd42b..a0d07968 100644 --- a/runpod/serverless/modules/rp_http.py +++ b/runpod/serverless/modules/rp_http.py @@ -20,7 +20,7 @@ job_list = Jobs() -async def _transmit(client_session, url, job_data ): +async def _transmit(client_session, url, job_data): """ Wrapper for transmitting results via POST. """ @@ -31,7 +31,7 @@ async def _transmit(client_session, url, job_data ): "data": job_data, "headers": {"charset": "utf-8", "Content-Type": "application/x-www-form-urlencoded"}, "raise_for_status": True - } + } async with retry_client.post(url, **kwargs) as client_response: await client_response.text() @@ -46,18 +46,18 @@ async def _handle_result(session, job_data, job, url_template, log_message): url = url_template.replace('$ID', job['id']) await _transmit(session, url, serialized_job_data) - log.debug(f"{job['id']} | {log_message}") + log.debug(f"{log_message}", job['id']) except aiohttp.ClientError as err: - log.error(f"{job['id']} | Failed to return job results. | {err}") + log.error(f"Failed to return job results. | {err}", job['id']) except (TypeError, RuntimeError) as err: - log.error(f"Error while returning job result {job['id']}: {err}") + log.error(f"Error while returning job result. | {err}", job['id']) finally: if url_template == JOB_DONE_URL and job_data.get('status', None) != 'IN_PROGRESS': job_list.remove_job(job["id"]) - log.info(f'{job["id"]} | Finished') + log.info("Finished.", job['id']) async def send_result(session, job_data, job): diff --git a/runpod/serverless/worker.py b/runpod/serverless/worker.py index 13e6d019..f9e1ae7e 100644 --- a/runpod/serverless/worker.py +++ b/runpod/serverless/worker.py @@ -22,8 +22,6 @@ job_list = Jobs() heartbeat = Heartbeat() -_TIMEOUT = aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2) - def _get_auth_header() -> Dict[str, str]: ''' @@ -94,8 +92,9 @@ async def run_worker(config: Dict[str, Any]) -> None: heartbeat.start_ping() client_session = aiohttp.ClientSession( - connector=aiohttp.TCPConnector(limit=None), - headers=_get_auth_header(), timeout=_TIMEOUT + connector=aiohttp.TCPConnector(limit=0), + headers=_get_auth_header(), + timeout=aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2) ) async with client_session as session: diff --git a/tests/test_serverless/test_modules/test_http.py b/tests/test_serverless/test_modules/test_http.py index 510dc447..eed7fd81 100644 --- a/tests/test_serverless/test_modules/test_http.py +++ b/tests/test_serverless/test_modules/test_http.py @@ -11,6 +11,7 @@ from runpod.serverless.modules import rp_http + class MockRequestInfo: ''' Mock aiohttp.RequestInfo class. ''' @@ -34,17 +35,16 @@ def setUp(self) -> None: def tearDown(self) -> None: gc.collect() - async def test_send_result(self): ''' Test send_result function. ''' with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ - patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ - patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: mock_retry.return_value.post.return_value = AsyncMock() - mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long + mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long mock_jobs.return_value = set(['test_id']) send_return_local = await rp_http.send_result(AsyncMock(), self.job_data, self.job) @@ -64,11 +64,11 @@ async def test_send_result(self): raise_for_status=True ) - async def test_send_result_client_response_error(self): ''' Test send_result function with ClientResponseError. ''' + def mock_request_info_init(self, *args, **kwargs): ''' Mock aiohttp.RequestInfo.__init__ method. @@ -80,9 +80,9 @@ def mock_request_info_init(self, *args, **kwargs): self.real_url = "http://test_url" with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ - patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ - patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry, \ - patch.object(aiohttp.RequestInfo, "__init__", mock_request_info_init): + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry, \ + patch.object(aiohttp.RequestInfo, "__init__", mock_request_info_init): mock_retry.side_effect = aiohttp.ClientResponseError( request_info=MockRequestInfo, @@ -99,15 +99,14 @@ def mock_request_info_init(self, *args, **kwargs): assert mock_log.error.call_count == 1 assert mock_log.info.call_count == 1 - async def test_send_result_type_error(self): ''' Test send_result function with TypeError. ''' with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ - patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ - patch('runpod.serverless.modules.rp_http.json.dumps') as mock_dumps, \ - patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.json.dumps') as mock_dumps, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: mock_dumps.side_effect = TypeError("Forced exception") @@ -119,19 +118,19 @@ async def test_send_result_type_error(self): assert mock_log.error.call_count == 1 assert mock_log.info.call_count == 1 assert mock_retry.return_value.post.call_count == 0 - mock_log.error.assert_called_with("Error while returning job result test_id: Forced exception") # pylint: disable=line-too-long - + mock_log.error.assert_called_with( + 'Error while returning job result. | Forced exception', 'test_id') # pylint: disable=line-too-long async def test_stream_result(self): ''' Test stream_result function. ''' with patch('runpod.serverless.modules.rp_http.log') as mock_log, \ - patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ - patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: + patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \ + patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry: mock_retry.return_value.post.return_value = AsyncMock() - mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long + mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long mock_jobs.return_value = set(['test_id']) send_return_local = await rp_http.stream_result(AsyncMock(), self.job_data, self.job)