Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/serverless/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
# ERROR | An error message


log.debug('A debug message', job_id=JOB_ID)
log.info('An info message', job_id=JOB_ID)
log.warn('A warning message', job_id=JOB_ID)
log.error('An error message', job_id=JOB_ID)
log.debug('A debug message', request_id=JOB_ID)
log.info('An info message', request_id=JOB_ID)
log.warn('A warning message', request_id=JOB_ID)
log.error('An error message', request_id=JOB_ID)

# Output:
# {"requestId": "1234567890", "message": "A debug message", "level": "DEBUG"}
Expand Down
18 changes: 9 additions & 9 deletions runpod/serverless/modules/rp_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
continue

next_job = await response.json()
log.debug(f"Received Job | {next_job}")
log.debug(f"Request Received | {next_job}")

# Check if the job is valid
job_id = next_job.get("id", None)
Expand All @@ -94,11 +94,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
if not retry:
return None

log.debug(f"{next_job['id']} | Valid Job Confirmed")
log.debug("Confirmed valid request.", next_job['id'])

if next_job:
job_list.add_job(next_job["id"])
log.debug(f"{next_job['id']} | Added Job ID")
log.debug("Request ID added.", next_job['id'])

return next_job

Expand All @@ -108,14 +108,14 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
Run the job using the handler.
Returns the job output or error.
"""
log.info(f'{job["id"]} | Started')
log.info('Started', job["id"])
run_result = {"error": "No output from handler."}

try:
handler_return = handler(job)
job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return

log.debug(f'{job["id"]} | Handler output: {job_output}')
log.debug(f'Handler output: {job_output}', job["id"])

if isinstance(job_output, dict):
error_msg = job_output.pop("error", None)
Expand Down Expand Up @@ -149,12 +149,12 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
"runpod_version": runpod_version
}

log.error(f'{job["id"]} | Captured Handler Exception')
log.error('Captured Handler Exception', job["id"])
log.error(json.dumps(error_info, indent=4))
run_result = {"error": json.dumps(error_info)}

finally:
log.debug(f'{job["id"]} | run_job return: {run_result}')
log.debug(f'run_job return: {run_result}', job["id"])

return run_result

Expand All @@ -175,7 +175,7 @@ async def run_job_generator(
for output_partial in job_output:
yield {"output": output_partial}
except Exception as err: # pylint: disable=broad-except
log.error(f'Error while running job {job["id"]}: {err}')
log.error(err, job["id"])
yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"}
finally:
log.info(f'{job["id"]} | Finished ')
log.info('Finished', job["id"])
21 changes: 12 additions & 9 deletions runpod/serverless/modules/rp_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def log(self, message, message_level='INFO', job_id=None):
if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP':
return

if job_id:
if os.environ.get('RUNPOD_ENDPOINT_ID'):
log_json = {
'requestId': job_id,
'message': message,
Expand All @@ -80,6 +80,9 @@ def log(self, message, message_level='INFO', job_id=None):
print(json.dumps(log_json), flush=True)
return

if job_id:
message = f'{job_id} | {message}'

print(f'{message_level.ljust(7)}| {message}', flush=True)
return

Expand All @@ -92,29 +95,29 @@ def secret(self, secret_name, secret):
redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1]
self.info(f"{secret_name}: {redacted_secret}")

def debug(self, message, job_id=None):
def debug(self, message, request_id=None):
'''
debug log
'''
self.log(message, 'DEBUG', job_id)
self.log(message, 'DEBUG', request_id)

def info(self, message, job_id=None):
def info(self, message, request_id=None):
'''
info log
'''
self.log(message, 'INFO', job_id)
self.log(message, 'INFO', request_id)

def warn(self, message, job_id=None):
def warn(self, message, request_id=None):
'''
warn log
'''
self.log(message, 'WARN', job_id)
self.log(message, 'WARN', request_id)

def error(self, message, job_id=None):
def error(self, message, request_id=None):
'''
error log
'''
self.log(message, 'ERROR', job_id)
self.log(message, 'ERROR', request_id)

def tip(self, message):
'''
Expand Down
2 changes: 1 addition & 1 deletion runpod/serverless/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def _process_job(job, session, job_scaler, config):

# If refresh_worker is set, pod will be reset after job is complete.
if config.get("refresh_worker", False):
log.info(f"refresh_worker | Flag set, stopping pod after job {job['id']}.")
log.info("refresh_worker flag set, stopping pod after job.", job['id'])
job_result["stopPod"] = True
job_scaler.kill_worker()

Expand Down
43 changes: 20 additions & 23 deletions tests/test_serverless/test_modules/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from runpod.serverless.modules import rp_job


class TestJob(IsolatedAsyncioTestCase):
''' Tests the Job class. '''

Expand Down Expand Up @@ -38,19 +39,18 @@ async def test_get_job_200(self):
response4.json = make_mocked_coro(return_value={"id": "123", "input": {"number": 1}})

with patch("aiohttp.ClientSession") as mock_session, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

# Set side_effect to a list of mock responses
mock_session.get.return_value.__aenter__.side_effect = [
response1, response2, response3, response4
]
]

job = await rp_job.get_job(mock_session, retry=True)

# Assertions for the success case
assert job == {"id": "123", "input": {"number": 1}}


async def test_get_job_204(self):
'''
Tests the get_job function with a 204 response
Expand All @@ -61,7 +61,7 @@ async def test_get_job_204(self):
response_204.json = make_mocked_coro(return_value=None)

with patch("aiohttp.ClientSession") as mock_session_204, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_204.get.return_value.__aenter__.return_value = response_204
job = await rp_job.get_job(mock_session_204, retry=False)
Expand All @@ -78,14 +78,13 @@ async def test_get_job_400(self):
response_400.status = 400

with patch("aiohttp.ClientSession") as mock_session_400, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_400.get.return_value.__aenter__.return_value = response_400
job = await rp_job.get_job(mock_session_400, retry=False)

assert job is None


async def test_get_job_500(self):
'''
Tests the get_job function with a 500 response
Expand All @@ -95,14 +94,13 @@ async def test_get_job_500(self):
response_500.status = 500

with patch("aiohttp.ClientSession") as mock_session_500, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_500.get.return_value.__aenter__.return_value = response_500
job = await rp_job.get_job(mock_session_500, retry=False)

assert job is None


async def test_get_job_no_id(self):
'''
Tests the get_job function with a 200 response but no id
Expand All @@ -111,10 +109,9 @@ async def test_get_job_no_id(self):
response.status = 200
response.json = make_mocked_coro(return_value={})


with patch("aiohttp.ClientSession") as mock_session, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session.get.return_value.__aenter__.return_value = response

Expand All @@ -131,10 +128,9 @@ async def test_get_job_no_input(self):
response.status = 200
response.json = make_mocked_coro(return_value={"id": "123"})


with patch("aiohttp.ClientSession") as mock_session, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session.get.return_value.__aenter__.return_value = response

Expand All @@ -152,15 +148,16 @@ async def test_get_job_exception(self):
response_exception.status = 200

with patch("aiohttp.ClientSession") as mock_session_exception, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_exception.get.return_value.__aenter__.side_effect = Exception
job = await rp_job.get_job(mock_session_exception, retry=False)

assert job is None
assert mock_log.error.call_count == 1


class TestRunJob(IsolatedAsyncioTestCase):
''' Tests the run_job function '''

Expand All @@ -184,7 +181,7 @@ async def test_simple_job(self):

mock_handler.return_value = ['test1', 'test2']
job_result_list = await rp_job.run_job(mock_handler, self.sample_job)
assert job_result_list == {"output":["test1", "test2"]}
assert job_result_list == {"output": ["test1", "test2"]}

mock_handler.return_value = 123
job_result_int = await rp_job.run_job(mock_handler, self.sample_job)
Expand Down Expand Up @@ -249,14 +246,14 @@ async def test_job_with_exception(self):
class TestRunJobGenerator(IsolatedAsyncioTestCase):
''' Tests the run_job_generator function '''

def handler_gen_success(self, job): # pylint: disable=unused-argument
def handler_gen_success(self, job): # pylint: disable=unused-argument
'''
Test handler that returns a generator.
'''
yield "partial_output_1"
yield "partial_output_2"

async def handler_async_gen_success(self, job): # pylint: disable=unused-argument
async def handler_async_gen_success(self, job): # pylint: disable=unused-argument
'''
Test handler that returns an async generator.
'''
Expand All @@ -267,7 +264,7 @@ def handler_fail(self, job):
'''
Test handler that raises an exception.
'''
raise Exception("Test Exception") # pylint: disable=broad-exception-raised
raise Exception("Test Exception") # pylint: disable=broad-exception-raised

async def test_run_job_generator_success(self):
'''
Expand All @@ -282,7 +279,7 @@ async def test_run_job_generator_success(self):
assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}]
assert mock_log.error.call_count == 0
assert mock_log.info.call_count == 1
mock_log.info.assert_called_with('123 | Finished ')
mock_log.info.assert_called_with('Finished', '123')

async def test_run_job_generator_success_async(self):
'''
Expand All @@ -297,7 +294,7 @@ async def test_run_job_generator_success_async(self):
assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}]
assert mock_log.error.call_count == 0
assert mock_log.info.call_count == 1
mock_log.info.assert_called_with('123 | Finished ')
mock_log.info.assert_called_with('Finished', '123')

async def test_run_job_generator_exception(self):
'''
Expand All @@ -313,4 +310,4 @@ async def test_run_job_generator_exception(self):
assert "error" in result[0]
assert mock_log.error.call_count == 1
assert mock_log.info.call_count == 1
mock_log.info.assert_called_with('123 | Finished ')
mock_log.info.assert_called_with('Finished', '123')