diff --git a/examples/serverless/logger.py b/examples/serverless/logger.py index defb3d58..67e74967 100644 --- a/examples/serverless/logger.py +++ b/examples/serverless/logger.py @@ -18,10 +18,10 @@ # ERROR | An error message -log.debug('A debug message', job_id=JOB_ID) -log.info('An info message', job_id=JOB_ID) -log.warn('A warning message', job_id=JOB_ID) -log.error('An error message', job_id=JOB_ID) +log.debug('A debug message', request_id=JOB_ID) +log.info('An info message', request_id=JOB_ID) +log.warn('A warning message', request_id=JOB_ID) +log.error('An error message', request_id=JOB_ID) # Output: # {"requestId": "1234567890", "message": "A debug message", "level": "DEBUG"} diff --git a/runpod/serverless/modules/rp_job.py b/runpod/serverless/modules/rp_job.py index 411d6874..3498bacb 100644 --- a/runpod/serverless/modules/rp_job.py +++ b/runpod/serverless/modules/rp_job.py @@ -70,7 +70,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] continue next_job = await response.json() - log.debug(f"Received Job | {next_job}") + log.debug(f"Request Received | {next_job}") # Check if the job is valid job_id = next_job.get("id", None) @@ -94,11 +94,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any] if not retry: return None - log.debug(f"{next_job['id']} | Valid Job Confirmed") + log.debug("Confirmed valid request.", next_job['id']) if next_job: job_list.add_job(next_job["id"]) - log.debug(f"{next_job['id']} | Added Job ID") + log.debug("Request ID added.", next_job['id']) return next_job @@ -108,14 +108,14 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: Run the job using the handler. Returns the job output or error. """ - log.info(f'{job["id"]} | Started') + log.info('Started', job["id"]) run_result = {"error": "No output from handler."} try: handler_return = handler(job) job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return - log.debug(f'{job["id"]} | Handler output: {job_output}') + log.debug(f'Handler output: {job_output}', job["id"]) if isinstance(job_output, dict): error_msg = job_output.pop("error", None) @@ -149,12 +149,12 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]: "runpod_version": runpod_version } - log.error(f'{job["id"]} | Captured Handler Exception') + log.error('Captured Handler Exception', job["id"]) log.error(json.dumps(error_info, indent=4)) run_result = {"error": json.dumps(error_info)} finally: - log.debug(f'{job["id"]} | run_job return: {run_result}') + log.debug(f'run_job return: {run_result}', job["id"]) return run_result @@ -175,7 +175,7 @@ async def run_job_generator( for output_partial in job_output: yield {"output": output_partial} except Exception as err: # pylint: disable=broad-except - log.error(f'Error while running job {job["id"]}: {err}') + log.error(err, job["id"]) yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"} finally: - log.info(f'{job["id"]} | Finished ') + log.info('Finished', job["id"]) diff --git a/runpod/serverless/modules/rp_logger.py b/runpod/serverless/modules/rp_logger.py index 3811bb8a..86725050 100644 --- a/runpod/serverless/modules/rp_logger.py +++ b/runpod/serverless/modules/rp_logger.py @@ -71,7 +71,7 @@ def log(self, message, message_level='INFO', job_id=None): if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP': return - if job_id: + if os.environ.get('RUNPOD_ENDPOINT_ID'): log_json = { 'requestId': job_id, 'message': message, @@ -80,6 +80,9 @@ def log(self, message, message_level='INFO', job_id=None): print(json.dumps(log_json), flush=True) return + if job_id: + message = f'{job_id} | {message}' + print(f'{message_level.ljust(7)}| {message}', flush=True) return @@ -92,29 +95,29 @@ def secret(self, secret_name, secret): redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1] self.info(f"{secret_name}: {redacted_secret}") - def debug(self, message, job_id=None): + def debug(self, message, request_id=None): ''' debug log ''' - self.log(message, 'DEBUG', job_id) + self.log(message, 'DEBUG', request_id) - def info(self, message, job_id=None): + def info(self, message, request_id=None): ''' info log ''' - self.log(message, 'INFO', job_id) + self.log(message, 'INFO', request_id) - def warn(self, message, job_id=None): + def warn(self, message, request_id=None): ''' warn log ''' - self.log(message, 'WARN', job_id) + self.log(message, 'WARN', request_id) - def error(self, message, job_id=None): + def error(self, message, request_id=None): ''' error log ''' - self.log(message, 'ERROR', job_id) + self.log(message, 'ERROR', request_id) def tip(self, message): ''' diff --git a/runpod/serverless/worker.py b/runpod/serverless/worker.py index f8694863..13e6d019 100644 --- a/runpod/serverless/worker.py +++ b/runpod/serverless/worker.py @@ -63,7 +63,7 @@ async def _process_job(job, session, job_scaler, config): # If refresh_worker is set, pod will be reset after job is complete. if config.get("refresh_worker", False): - log.info(f"refresh_worker | Flag set, stopping pod after job {job['id']}.") + log.info("refresh_worker flag set, stopping pod after job.", job['id']) job_result["stopPod"] = True job_scaler.kill_worker() diff --git a/tests/test_serverless/test_modules/test_job.py b/tests/test_serverless/test_modules/test_job.py index c6897f7c..588bfd76 100644 --- a/tests/test_serverless/test_modules/test_job.py +++ b/tests/test_serverless/test_modules/test_job.py @@ -10,6 +10,7 @@ from runpod.serverless.modules import rp_job + class TestJob(IsolatedAsyncioTestCase): ''' Tests the Job class. ''' @@ -38,19 +39,18 @@ async def test_get_job_200(self): response4.json = make_mocked_coro(return_value={"id": "123", "input": {"number": 1}}) with patch("aiohttp.ClientSession") as mock_session, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): # Set side_effect to a list of mock responses mock_session.get.return_value.__aenter__.side_effect = [ response1, response2, response3, response4 - ] + ] job = await rp_job.get_job(mock_session, retry=True) # Assertions for the success case assert job == {"id": "123", "input": {"number": 1}} - async def test_get_job_204(self): ''' Tests the get_job function with a 204 response @@ -61,7 +61,7 @@ async def test_get_job_204(self): response_204.json = make_mocked_coro(return_value=None) with patch("aiohttp.ClientSession") as mock_session_204, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_204.get.return_value.__aenter__.return_value = response_204 job = await rp_job.get_job(mock_session_204, retry=False) @@ -78,14 +78,13 @@ async def test_get_job_400(self): response_400.status = 400 with patch("aiohttp.ClientSession") as mock_session_400, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_400.get.return_value.__aenter__.return_value = response_400 job = await rp_job.get_job(mock_session_400, retry=False) assert job is None - async def test_get_job_500(self): ''' Tests the get_job function with a 500 response @@ -95,14 +94,13 @@ async def test_get_job_500(self): response_500.status = 500 with patch("aiohttp.ClientSession") as mock_session_500, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_500.get.return_value.__aenter__.return_value = response_500 job = await rp_job.get_job(mock_session_500, retry=False) assert job is None - async def test_get_job_no_id(self): ''' Tests the get_job function with a 200 response but no id @@ -111,10 +109,9 @@ async def test_get_job_no_id(self): response.status = 200 response.json = make_mocked_coro(return_value={}) - with patch("aiohttp.ClientSession") as mock_session, \ - patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session.get.return_value.__aenter__.return_value = response @@ -131,10 +128,9 @@ async def test_get_job_no_input(self): response.status = 200 response.json = make_mocked_coro(return_value={"id": "123"}) - with patch("aiohttp.ClientSession") as mock_session, \ - patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session.get.return_value.__aenter__.return_value = response @@ -152,8 +148,8 @@ async def test_get_job_exception(self): response_exception.status = 200 with patch("aiohttp.ClientSession") as mock_session_exception, \ - patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ - patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): + patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \ + patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"): mock_session_exception.get.return_value.__aenter__.side_effect = Exception job = await rp_job.get_job(mock_session_exception, retry=False) @@ -161,6 +157,7 @@ async def test_get_job_exception(self): assert job is None assert mock_log.error.call_count == 1 + class TestRunJob(IsolatedAsyncioTestCase): ''' Tests the run_job function ''' @@ -184,7 +181,7 @@ async def test_simple_job(self): mock_handler.return_value = ['test1', 'test2'] job_result_list = await rp_job.run_job(mock_handler, self.sample_job) - assert job_result_list == {"output":["test1", "test2"]} + assert job_result_list == {"output": ["test1", "test2"]} mock_handler.return_value = 123 job_result_int = await rp_job.run_job(mock_handler, self.sample_job) @@ -249,14 +246,14 @@ async def test_job_with_exception(self): class TestRunJobGenerator(IsolatedAsyncioTestCase): ''' Tests the run_job_generator function ''' - def handler_gen_success(self, job): # pylint: disable=unused-argument + def handler_gen_success(self, job): # pylint: disable=unused-argument ''' Test handler that returns a generator. ''' yield "partial_output_1" yield "partial_output_2" - async def handler_async_gen_success(self, job): # pylint: disable=unused-argument + async def handler_async_gen_success(self, job): # pylint: disable=unused-argument ''' Test handler that returns an async generator. ''' @@ -267,7 +264,7 @@ def handler_fail(self, job): ''' Test handler that raises an exception. ''' - raise Exception("Test Exception") # pylint: disable=broad-exception-raised + raise Exception("Test Exception") # pylint: disable=broad-exception-raised async def test_run_job_generator_success(self): ''' @@ -282,7 +279,7 @@ async def test_run_job_generator_success(self): assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}] assert mock_log.error.call_count == 0 assert mock_log.info.call_count == 1 - mock_log.info.assert_called_with('123 | Finished ') + mock_log.info.assert_called_with('Finished', '123') async def test_run_job_generator_success_async(self): ''' @@ -297,7 +294,7 @@ async def test_run_job_generator_success_async(self): assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}] assert mock_log.error.call_count == 0 assert mock_log.info.call_count == 1 - mock_log.info.assert_called_with('123 | Finished ') + mock_log.info.assert_called_with('Finished', '123') async def test_run_job_generator_exception(self): ''' @@ -313,4 +310,4 @@ async def test_run_job_generator_exception(self): assert "error" in result[0] assert mock_log.error.call_count == 1 assert mock_log.info.call_count == 1 - mock_log.info.assert_called_with('123 | Finished ') + mock_log.info.assert_called_with('Finished', '123')