Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ env
.env
.vscode
test_wrapper.py
test_logging.py

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
14 changes: 13 additions & 1 deletion docs/serverless/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

The worker outputs logs to the console at different points in the workers lifecycle. These logs can be used to debug issues with the worker or handler. There are four logging levels that can be used to control the verbosity of the logs:

0. `NOTSET` - Does not output any logs.

1. `DEBUG` (Default) - Outputs all logs, including debug logs.

2. `INFO` - Outputs all logs except debug logs.
Expand All @@ -12,7 +14,17 @@ The worker outputs logs to the console at different points in the workers lifecy

4. `ERROR` - Outputs only error logs.

To set the logging level, set the `RUNPOD_DEBUG_LEVEL` environment variable to one of the above logging levels. For example, to set the logging level to `INFO`, set the `RUNPOD_DEBUG_LEVEL` environment variable to `INFO`.
### Setting the Logging Level

There are two ways to set the logging level:

1. Set the `RUNPOD_DEBUG_LEVEL` environment variable to one of the above logging levels.

2. Set the `rp_log_level` argument when calling the file with your handler. If this value is set, it will override the `RUNPOD_DEBUG_LEVEL` environment variable.

```python
python worker.py --rp_log_level='INFO'
```

## Error Handling

Expand Down
16 changes: 15 additions & 1 deletion runpod/serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@

from . import work_loop
from .modules import rp_fastapi
from .modules.rp_logger import RunPodLogger

log = RunPodLogger()

# ---------------------------------------------------------------------------- #
# Run Time Arguments #
# ---------------------------------------------------------------------------- #
parser = argparse.ArgumentParser()
# Arguments will be passed in with the config under the key "rp_args"
parser = argparse.ArgumentParser(
prog="runpod",
description="Runpod Serverless Worker Arguments."
)
parser.add_argument("--test_input", type=str, default=None,
help="Test input for the worker, formatted as JSON.")
parser.add_argument("--rp_debugger", action="store_true", default=None,
help="Flag to enable the Debugger.")
parser.add_argument("rp_log_level", default=None,
help="""Controls what level of logs are printed to the console.
Options: ERROR, WARN, INFO, and DEBUG.""")


def _set_config_args(config) -> dict:
Expand All @@ -35,6 +45,10 @@ def _set_config_args(config) -> dict:
if config["rp_args"]["test_input"]:
config["rp_args"]["test_input"] = json.loads(config["rp_args"]["test_input"])

# Set the log level
if config["rp_args"]["rp_log_level"]:
log.set_level(config["rp_args"]["rp_debug_level"])

return config


Expand Down
4 changes: 3 additions & 1 deletion runpod/serverless/modules/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

import requests

import runpod.serverless.modules.logging as log
from runpod.serverless.modules.rp_logger import RunPodLogger
from .worker_state import get_current_job_id, PING_URL, PING_INTERVAL

log = RunPodLogger()

_session = requests.Session()
_session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"})

Expand Down
4 changes: 3 additions & 1 deletion runpod/serverless/modules/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import traceback
from aiohttp import ClientSession

import runpod.serverless.modules.logging as log
from runpod.serverless.modules.rp_logger import RunPodLogger
from .worker_state import IS_LOCAL_TEST, JOB_GET_URL
from .rp_tips import check_return_size

log = RunPodLogger()


def _get_local() -> Optional[Dict[str, Any]]:
"""
Expand Down
85 changes: 0 additions & 85 deletions runpod/serverless/modules/logging.py

This file was deleted.

4 changes: 3 additions & 1 deletion runpod/serverless/modules/rp_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

import json

import runpod.serverless.modules.logging as log
from runpod.serverless.modules.rp_logger import RunPodLogger
from .retry import retry
from .worker_state import IS_LOCAL_TEST, get_done_url, get_stream_url

log = RunPodLogger()


@retry(max_attempts=3, base_delay=1, max_delay=3)
async def transmit(session, job_data, url):
Expand Down
113 changes: 113 additions & 0 deletions runpod/serverless/modules/rp_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'''
PodWorker | modules | logging.py

Log Levels (Level - Value - Description)

NOTSET - 0 - No logging is configured, the logging system is effectively disabled.
DEBUG - 1 - Detailed information, typically of interest only when diagnosing problems. (Default)
INFO - 2 - Confirmation that things are working as expected.
WARN - 3 - An indication that something unexpected happened.
ERROR - 4 - Serious problem, the software has not been able to perform some function.
'''

import os
from dotenv import load_dotenv

env_path = os.path.join(os.getcwd(), '.env')
load_dotenv(env_path) # Load environment variables

LOG_LEVELS = ['NOTSET', 'DEBUG', 'INFO', 'WARN', 'ERROR']


def _validate_log_level(log_level):
'''
Checks the debug level and returns the debug level name.
'''
if isinstance(log_level, str):
log_level = log_level.upper()

if log_level not in LOG_LEVELS:
raise ValueError(f'Invalid debug level: {log_level}')

return log_level

if isinstance(log_level, int):
if log_level < 0 or log_level > 4:
raise ValueError(f'Invalid debug level: {log_level}')

return LOG_LEVELS[log_level]

raise ValueError(f'Invalid debug level: {log_level}')


class RunPodLogger:
'''Singleton class for logging.'''

__instance = None
level = _validate_log_level(os.environ.get('RUNPOD_DEBUG_LEVEL', 'DEBUG'))

def __new__(cls):
if RunPodLogger.__instance is None:
RunPodLogger.__instance = object.__new__(cls)
return RunPodLogger.__instance

def set_level(self, new_level):
'''
Set the debug level for logging.
Can be set to the name or value of the debug level.
'''
self.level = _validate_log_level(new_level)
self.info(f'Log level set to {self.level}')

def log(self, message, message_level='INFO'):
'''
Log message to stdout if RUNPOD_DEBUG is true.
'''
if self.level == 'NOTSET':
return

level_index = LOG_LEVELS.index(self.level)
if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP':
return

print(f'{message_level.ljust(7)}| {message}', flush=True)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information

This expression logs [sensitive data (secret)](1) as clear text. This expression logs [sensitive data (secret)](2) as clear text. This expression logs [sensitive data (secret)](3) as clear text.
return

def secret(self, secret_name, secret):
'''
Censors secrets for logging.
Replaces everything except the first and last characters with *
'''
secret = str(secret)
redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1]
self.info(f"{secret_name}: {redacted_secret}")

def debug(self, message):
'''
debug log
'''
self.log(message, 'DEBUG')

def info(self, message):
'''
info log
'''
self.log(message, 'INFO')

def warn(self, message):
'''
warn log
'''
self.log(message, 'WARN')

def error(self, message):
'''
error log
'''
self.log(message, 'ERROR')

def tip(self, message):
'''
tip log
'''
self.log(message, 'TIP')
4 changes: 3 additions & 1 deletion runpod/serverless/modules/rp_tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
'''

import sys
import runpod.serverless.modules.logging as log
import runpod.serverless.modules.rp_logger as RunPodLogger

log = RunPodLogger.RunPodLogger()


def check_return_size(return_body):
Expand Down
4 changes: 3 additions & 1 deletion runpod/serverless/work_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

import aiohttp

import runpod.serverless.modules.logging as log
from runpod.serverless.modules.rp_logger import RunPodLogger
from .modules.heartbeat import HeartbeatSender
from .modules.job import get_job, run_job, run_job_generator
from .modules.rp_http import send_result, stream_result
from .modules.worker_state import REF_COUNT_ZERO, set_job_id
from .utils import rp_debugger

log = RunPodLogger()

_TIMEOUT = aiohttp.ClientTimeout(total=300, connect=2, sock_connect=2)

heartbeat = HeartbeatSender()
Expand Down
51 changes: 51 additions & 0 deletions tests/test_serverless/test_module_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
''' Tests for runpod.serverless.modules.rp_logger '''

import unittest
from unittest.mock import patch

from runpod.serverless.modules import rp_logger


class TestLogger(unittest.TestCase):
''' Tests for rp_logger '''

def test_default_log_level(self):
'''
Tests that the default log level is DEBUG
'''
logger = rp_logger.RunPodLogger()

self.assertEqual(logger.level, "DEBUG")

def test_singleton(self):
'''
Tests that the logger is a singleton
'''
logger1 = rp_logger.RunPodLogger()
logger2 = rp_logger.RunPodLogger()

self.assertIs(logger1, logger2)

def test_set_log_level(self):
'''
Tests that the log level can be set
'''
logger = rp_logger.RunPodLogger()

logger.set_level("INFO")
self.assertEqual(logger.level, "INFO")

logger.set_level("WARN")
self.assertEqual(logger.level, "WARN")

def test_call_log(self):
'''
Tests that the logger can be called
'''
log = rp_logger.RunPodLogger()

with patch("runpod.serverless.modules.rp_logger.RunPodLogger.log") as mock_log:

log.warn("Test log message")

mock_log.assert_called_once_with("Test log message", "WARN")