Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- Robust `get_job` error handling
- `project.toml` now includes required dependencies

---

## Release 1.3.4 (11/14/23)

### Changed
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
aiohttp >= 3.8.6
aiohttp >= 3.8.6, < 3.9.0
aiohttp-retry >= 2.8.3

backoff >= 2.2.1
Expand Down
12 changes: 6 additions & 6 deletions runpod/serverless/modules/rp_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
job_list = Jobs()


async def _transmit(client_session, url, job_data ):
async def _transmit(client_session, url, job_data):
"""
Wrapper for transmitting results via POST.
"""
Expand All @@ -31,7 +31,7 @@ async def _transmit(client_session, url, job_data ):
"data": job_data,
"headers": {"charset": "utf-8", "Content-Type": "application/x-www-form-urlencoded"},
"raise_for_status": True
}
}

async with retry_client.post(url, **kwargs) as client_response:
await client_response.text()
Expand All @@ -46,18 +46,18 @@ async def _handle_result(session, job_data, job, url_template, log_message):
url = url_template.replace('$ID', job['id'])

await _transmit(session, url, serialized_job_data)
log.debug(f"{job['id']} | {log_message}")
log.debug(f"{log_message}", job['id'])

except aiohttp.ClientError as err:
log.error(f"{job['id']} | Failed to return job results. | {err}")
log.error(f"Failed to return job results. | {err}", job['id'])

except (TypeError, RuntimeError) as err:
log.error(f"Error while returning job result {job['id']}: {err}")
log.error(f"Error while returning job result. | {err}", job['id'])

finally:
if url_template == JOB_DONE_URL and job_data.get('status', None) != 'IN_PROGRESS':
job_list.remove_job(job["id"])
log.info(f'{job["id"]} | Finished')
log.info("Finished.", job['id'])


async def send_result(session, job_data, job):
Expand Down
7 changes: 3 additions & 4 deletions runpod/serverless/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
job_list = Jobs()
heartbeat = Heartbeat()

_TIMEOUT = aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2)


def _get_auth_header() -> Dict[str, str]:
'''
Expand Down Expand Up @@ -94,8 +92,9 @@ async def run_worker(config: Dict[str, Any]) -> None:
heartbeat.start_ping()

client_session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=None),
headers=_get_auth_header(), timeout=_TIMEOUT
connector=aiohttp.TCPConnector(limit=0),
headers=_get_auth_header(),
timeout=aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2)
)

async with client_session as session:
Expand Down
33 changes: 16 additions & 17 deletions tests/test_serverless/test_modules/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from runpod.serverless.modules import rp_http


class MockRequestInfo:
''' Mock aiohttp.RequestInfo class. '''

Expand All @@ -34,17 +35,16 @@ def setUp(self) -> None:
def tearDown(self) -> None:
gc.collect()


async def test_send_result(self):
'''
Test send_result function.
'''
with patch('runpod.serverless.modules.rp_http.log') as mock_log, \
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry:
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry:

mock_retry.return_value.post.return_value = AsyncMock()
mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long
mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long

mock_jobs.return_value = set(['test_id'])
send_return_local = await rp_http.send_result(AsyncMock(), self.job_data, self.job)
Expand All @@ -64,11 +64,11 @@ async def test_send_result(self):
raise_for_status=True
)


async def test_send_result_client_response_error(self):
'''
Test send_result function with ClientResponseError.
'''

def mock_request_info_init(self, *args, **kwargs):
'''
Mock aiohttp.RequestInfo.__init__ method.
Expand All @@ -80,9 +80,9 @@ def mock_request_info_init(self, *args, **kwargs):
self.real_url = "http://test_url"

with patch('runpod.serverless.modules.rp_http.log') as mock_log, \
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry, \
patch.object(aiohttp.RequestInfo, "__init__", mock_request_info_init):
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry, \
patch.object(aiohttp.RequestInfo, "__init__", mock_request_info_init):

mock_retry.side_effect = aiohttp.ClientResponseError(
request_info=MockRequestInfo,
Expand All @@ -99,15 +99,14 @@ def mock_request_info_init(self, *args, **kwargs):
assert mock_log.error.call_count == 1
assert mock_log.info.call_count == 1


async def test_send_result_type_error(self):
'''
Test send_result function with TypeError.
'''
with patch('runpod.serverless.modules.rp_http.log') as mock_log, \
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.json.dumps') as mock_dumps, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry:
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.json.dumps') as mock_dumps, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry:

mock_dumps.side_effect = TypeError("Forced exception")

Expand All @@ -119,19 +118,19 @@ async def test_send_result_type_error(self):
assert mock_log.error.call_count == 1
assert mock_log.info.call_count == 1
assert mock_retry.return_value.post.call_count == 0
mock_log.error.assert_called_with("Error while returning job result test_id: Forced exception") # pylint: disable=line-too-long

mock_log.error.assert_called_with(
'Error while returning job result. | Forced exception', 'test_id') # pylint: disable=line-too-long

async def test_stream_result(self):
'''
Test stream_result function.
'''
with patch('runpod.serverless.modules.rp_http.log') as mock_log, \
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry:
patch('runpod.serverless.modules.rp_http.job_list.jobs') as mock_jobs, \
patch('runpod.serverless.modules.rp_http.RetryClient') as mock_retry:

mock_retry.return_value.post.return_value = AsyncMock()
mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long
mock_retry.return_value.post.return_value.__aenter__.return_value.text.return_value = "response text" # pylint: disable=line-too-long

mock_jobs.return_value = set(['test_id'])
send_return_local = await rp_http.stream_result(AsyncMock(), self.job_data, self.job)
Expand Down