diff --git a/.gitignore b/.gitignore index 675336ed..d9058fca 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ env .env .vscode test_wrapper.py +test_logging.py # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/docs/serverless/worker.md b/docs/serverless/worker.md index 77f9ccf1..b363be56 100644 --- a/docs/serverless/worker.md +++ b/docs/serverless/worker.md @@ -4,6 +4,8 @@ The worker outputs logs to the console at different points in the workers lifecycle. These logs can be used to debug issues with the worker or handler. There are four logging levels that can be used to control the verbosity of the logs: + 0. `NOTSET` - Does not output any logs. + 1. `DEBUG` (Default) - Outputs all logs, including debug logs. 2. `INFO` - Outputs all logs except debug logs. @@ -12,7 +14,17 @@ The worker outputs logs to the console at different points in the workers lifecy 4. `ERROR` - Outputs only error logs. -To set the logging level, set the `RUNPOD_DEBUG_LEVEL` environment variable to one of the above logging levels. For example, to set the logging level to `INFO`, set the `RUNPOD_DEBUG_LEVEL` environment variable to `INFO`. +### Setting the Logging Level + +There are two ways to set the logging level: + + 1. Set the `RUNPOD_DEBUG_LEVEL` environment variable to one of the above logging levels. + + 2. Set the `rp_log_level` argument when calling the file with your handler. If this value is set, it will override the `RUNPOD_DEBUG_LEVEL` environment variable. + + ```python + python worker.py --rp_log_level='INFO' + ``` ## Error Handling diff --git a/runpod/serverless/__init__.py b/runpod/serverless/__init__.py index c95219bb..aa3ee390 100644 --- a/runpod/serverless/__init__.py +++ b/runpod/serverless/__init__.py @@ -9,15 +9,25 @@ from . import work_loop from .modules import rp_fastapi +from .modules.rp_logger import RunPodLogger + +log = RunPodLogger() # ---------------------------------------------------------------------------- # # Run Time Arguments # # ---------------------------------------------------------------------------- # -parser = argparse.ArgumentParser() +# Arguments will be passed in with the config under the key "rp_args" +parser = argparse.ArgumentParser( + prog="runpod", + description="Runpod Serverless Worker Arguments." +) parser.add_argument("--test_input", type=str, default=None, help="Test input for the worker, formatted as JSON.") parser.add_argument("--rp_debugger", action="store_true", default=None, help="Flag to enable the Debugger.") +parser.add_argument("rp_log_level", default=None, + help="""Controls what level of logs are printed to the console. + Options: ERROR, WARN, INFO, and DEBUG.""") def _set_config_args(config) -> dict: @@ -35,6 +45,10 @@ def _set_config_args(config) -> dict: if config["rp_args"]["test_input"]: config["rp_args"]["test_input"] = json.loads(config["rp_args"]["test_input"]) + # Set the log level + if config["rp_args"]["rp_log_level"]: + log.set_level(config["rp_args"]["rp_debug_level"]) + return config diff --git a/runpod/serverless/modules/heartbeat.py b/runpod/serverless/modules/heartbeat.py index 164e5c98..677d6e41 100644 --- a/runpod/serverless/modules/heartbeat.py +++ b/runpod/serverless/modules/heartbeat.py @@ -7,9 +7,11 @@ import requests -import runpod.serverless.modules.logging as log +from runpod.serverless.modules.rp_logger import RunPodLogger from .worker_state import get_current_job_id, PING_URL, PING_INTERVAL +log = RunPodLogger() + _session = requests.Session() _session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"}) diff --git a/runpod/serverless/modules/job.py b/runpod/serverless/modules/job.py index 8caa42c7..872208a1 100644 --- a/runpod/serverless/modules/job.py +++ b/runpod/serverless/modules/job.py @@ -9,10 +9,12 @@ import traceback from aiohttp import ClientSession -import runpod.serverless.modules.logging as log +from runpod.serverless.modules.rp_logger import RunPodLogger from .worker_state import IS_LOCAL_TEST, JOB_GET_URL from .rp_tips import check_return_size +log = RunPodLogger() + def _get_local() -> Optional[Dict[str, Any]]: """ diff --git a/runpod/serverless/modules/logging.py b/runpod/serverless/modules/logging.py deleted file mode 100644 index 36c90543..00000000 --- a/runpod/serverless/modules/logging.py +++ /dev/null @@ -1,85 +0,0 @@ -''' PodWorker | modules | logging.py ''' - -import os -from dotenv import load_dotenv - -env_path = os.path.join(os.getcwd(), '.env') -load_dotenv(env_path) # Load environment variables - - -def log(message, level='INFO'): - ''' - Log message to stdout if RUNPOD_DEBUG is true. - ''' - set_level = os.environ.get('RUNPOD_DEBUG_LEVEL', 'INFO').upper() - - if os.environ.get('RUNPOD_DEBUG', 'true').lower() != 'true': - return - - if set_level == 'ERROR' and level != 'ERROR': - return - - if set_level == 'WARN' and level not in ['ERROR', 'WARN']: - return - - if set_level == 'INFO' and level not in ['ERROR', 'WARN', 'INFO']: - return - - level = level.ljust(7) - print(f'{level}| {message}', flush=True) - return - - -def log_secret(secret_name, secret, level='INFO'): - ''' - Censors secrets for logging. - Replaces everything except the first and last characters with * - ''' - if os.environ.get('RUNPOD_POD_ID', None) is not None: - if secret is None: - secret = 'Could not read environment variable.' - log(f"{secret_name}: {secret}", 'ERROR') - else: - secret = str(secret) - redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1] - log(f"{secret_name}: {redacted_secret}", level) - - -def error(message): - ''' - error log - ''' - log(message, 'ERROR') - - -def warn(message): - ''' - warn log - ''' - log(message, 'WARN') - - -def info(message): - ''' - info log - ''' - log(message, 'INFO') - - -def debug(message): - ''' - debug log - ''' - log(message, 'DEBUG') - - -def tip(message): - ''' - tip log - ''' - log(message, 'TIP') - - -log_secret('RUNPOD_AI_API_KEY', os.environ.get('RUNPOD_AI_API_KEY', None)) -log_secret('RUNPOD_WEBHOOK_GET_JOB', os.environ.get('RUNPOD_WEBHOOK_GET_JOB', None)) -log_secret('RUNPOD_WEBHOOK_POST_OUTPUT', os.environ.get('RUNPOD_WEBHOOK_POST_OUTPUT', None)) diff --git a/runpod/serverless/modules/rp_http.py b/runpod/serverless/modules/rp_http.py index 4522b853..68b4a72d 100644 --- a/runpod/serverless/modules/rp_http.py +++ b/runpod/serverless/modules/rp_http.py @@ -4,10 +4,12 @@ import json -import runpod.serverless.modules.logging as log +from runpod.serverless.modules.rp_logger import RunPodLogger from .retry import retry from .worker_state import IS_LOCAL_TEST, get_done_url, get_stream_url +log = RunPodLogger() + @retry(max_attempts=3, base_delay=1, max_delay=3) async def transmit(session, job_data, url): diff --git a/runpod/serverless/modules/rp_logger.py b/runpod/serverless/modules/rp_logger.py new file mode 100644 index 00000000..3d959cdf --- /dev/null +++ b/runpod/serverless/modules/rp_logger.py @@ -0,0 +1,113 @@ +''' +PodWorker | modules | logging.py + +Log Levels (Level - Value - Description) + +NOTSET - 0 - No logging is configured, the logging system is effectively disabled. +DEBUG - 1 - Detailed information, typically of interest only when diagnosing problems. (Default) +INFO - 2 - Confirmation that things are working as expected. +WARN - 3 - An indication that something unexpected happened. +ERROR - 4 - Serious problem, the software has not been able to perform some function. +''' + +import os +from dotenv import load_dotenv + +env_path = os.path.join(os.getcwd(), '.env') +load_dotenv(env_path) # Load environment variables + +LOG_LEVELS = ['NOTSET', 'DEBUG', 'INFO', 'WARN', 'ERROR'] + + +def _validate_log_level(log_level): + ''' + Checks the debug level and returns the debug level name. + ''' + if isinstance(log_level, str): + log_level = log_level.upper() + + if log_level not in LOG_LEVELS: + raise ValueError(f'Invalid debug level: {log_level}') + + return log_level + + if isinstance(log_level, int): + if log_level < 0 or log_level > 4: + raise ValueError(f'Invalid debug level: {log_level}') + + return LOG_LEVELS[log_level] + + raise ValueError(f'Invalid debug level: {log_level}') + + +class RunPodLogger: + '''Singleton class for logging.''' + + __instance = None + level = _validate_log_level(os.environ.get('RUNPOD_DEBUG_LEVEL', 'DEBUG')) + + def __new__(cls): + if RunPodLogger.__instance is None: + RunPodLogger.__instance = object.__new__(cls) + return RunPodLogger.__instance + + def set_level(self, new_level): + ''' + Set the debug level for logging. + Can be set to the name or value of the debug level. + ''' + self.level = _validate_log_level(new_level) + self.info(f'Log level set to {self.level}') + + def log(self, message, message_level='INFO'): + ''' + Log message to stdout if RUNPOD_DEBUG is true. + ''' + if self.level == 'NOTSET': + return + + level_index = LOG_LEVELS.index(self.level) + if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP': + return + + print(f'{message_level.ljust(7)}| {message}', flush=True) + return + + def secret(self, secret_name, secret): + ''' + Censors secrets for logging. + Replaces everything except the first and last characters with * + ''' + secret = str(secret) + redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1] + self.info(f"{secret_name}: {redacted_secret}") + + def debug(self, message): + ''' + debug log + ''' + self.log(message, 'DEBUG') + + def info(self, message): + ''' + info log + ''' + self.log(message, 'INFO') + + def warn(self, message): + ''' + warn log + ''' + self.log(message, 'WARN') + + def error(self, message): + ''' + error log + ''' + self.log(message, 'ERROR') + + def tip(self, message): + ''' + tip log + ''' + self.log(message, 'TIP') diff --git a/runpod/serverless/modules/rp_tips.py b/runpod/serverless/modules/rp_tips.py index a44e12c4..b8583f35 100644 --- a/runpod/serverless/modules/rp_tips.py +++ b/runpod/serverless/modules/rp_tips.py @@ -3,7 +3,9 @@ ''' import sys -import runpod.serverless.modules.logging as log +import runpod.serverless.modules.rp_logger as RunPodLogger + +log = RunPodLogger.RunPodLogger() def check_return_size(return_body): diff --git a/runpod/serverless/work_loop.py b/runpod/serverless/work_loop.py index e876d175..62efe074 100644 --- a/runpod/serverless/work_loop.py +++ b/runpod/serverless/work_loop.py @@ -10,13 +10,15 @@ import aiohttp -import runpod.serverless.modules.logging as log +from runpod.serverless.modules.rp_logger import RunPodLogger from .modules.heartbeat import HeartbeatSender from .modules.job import get_job, run_job, run_job_generator from .modules.rp_http import send_result, stream_result from .modules.worker_state import REF_COUNT_ZERO, set_job_id from .utils import rp_debugger +log = RunPodLogger() + _TIMEOUT = aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2) heartbeat = HeartbeatSender() diff --git a/tests/test_serverless/test_module_logger.py b/tests/test_serverless/test_module_logger.py new file mode 100644 index 00000000..8320ebd1 --- /dev/null +++ b/tests/test_serverless/test_module_logger.py @@ -0,0 +1,51 @@ +''' Tests for runpod.serverless.modules.rp_logger ''' + +import unittest +from unittest.mock import patch + +from runpod.serverless.modules import rp_logger + + +class TestLogger(unittest.TestCase): + ''' Tests for rp_logger ''' + + def test_default_log_level(self): + ''' + Tests that the default log level is DEBUG + ''' + logger = rp_logger.RunPodLogger() + + self.assertEqual(logger.level, "DEBUG") + + def test_singleton(self): + ''' + Tests that the logger is a singleton + ''' + logger1 = rp_logger.RunPodLogger() + logger2 = rp_logger.RunPodLogger() + + self.assertIs(logger1, logger2) + + def test_set_log_level(self): + ''' + Tests that the log level can be set + ''' + logger = rp_logger.RunPodLogger() + + logger.set_level("INFO") + self.assertEqual(logger.level, "INFO") + + logger.set_level("WARN") + self.assertEqual(logger.level, "WARN") + + def test_call_log(self): + ''' + Tests that the logger can be called + ''' + log = rp_logger.RunPodLogger() + + with patch("runpod.serverless.modules.rp_logger.RunPodLogger.log") as mock_log: + + log.warn("Test log message") + + mock_log.assert_called_once_with("Test log message", "WARN")