Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions runpod/serverless/modules/rp_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
"""

import os
import threading
import time

from multiprocessing import Process
import requests
from urllib3.util.retry import Retry

Expand All @@ -22,7 +21,7 @@
class Heartbeat:
"""Sends heartbeats to the Runpod server."""

_thread_started = False
_process_started = False

def __init__(self, pool_connections=10, retries=3) -> None:
"""
Expand All @@ -32,9 +31,10 @@ def __init__(self, pool_connections=10, retries=3) -> None:
self.PING_URL = self.PING_URL.replace("$RUNPOD_POD_ID", WORKER_ID)
self.PING_INTERVAL = int(os.environ.get("RUNPOD_PING_INTERVAL", 10000)) // 1000

# Create a new HTTP session
self._session = SyncClientSession()
self._session.headers.update(
{"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"}
{"Authorization": os.environ.get("RUNPOD_AI_API_KEY", "")}
)

retry_strategy = Retry(
Expand All @@ -52,9 +52,18 @@ def __init__(self, pool_connections=10, retries=3) -> None:
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)

@staticmethod
def process_loop(test=False):
"""
Static helper to run the ping loop in a separate process.
Creates a new Heartbeat instance to avoid pickling issues.
"""
hb = Heartbeat()
hb.ping_loop(test)

def start_ping(self, test=False):
"""
Sends heartbeat pings to the Runpod server.
Sends heartbeat pings to the Runpod server in a separate process.
"""
if not os.environ.get("RUNPOD_AI_API_KEY"):
log.debug("Not deployed on RunPod serverless, pings will not be sent.")
Expand All @@ -68,18 +77,19 @@ def start_ping(self, test=False):
log.error("Ping URL not set, cannot start ping.")
return

if not Heartbeat._thread_started:
threading.Thread(target=self.ping_loop, daemon=True, args=(test,)).start()
Heartbeat._thread_started = True
if not Heartbeat._process_started:
process = Process(target=Heartbeat.process_loop, args=(test,))
process.daemon = True
process.start()
Heartbeat._process_started = True

def ping_loop(self, test=False):
"""
Sends heartbeat pings to the Runpod server.
Sends heartbeat pings to the Runpod server until interrupted.
"""
while True:
self._send_ping()
time.sleep(self.PING_INTERVAL)

if test:
return

Expand All @@ -98,6 +108,5 @@ def _send_ping(self):
log.debug(
f"Heartbeat Sent | URL: {result.url} | Status: {result.status_code}"
)

except requests.RequestException as err:
log.error(f"Ping Request Error: {err}, attempting to restart ping.")
157 changes: 113 additions & 44 deletions runpod/serverless/modules/worker_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import os
import time
import uuid
from multiprocessing import Manager
from multiprocessing.managers import SyncManager
from typing import Any, Dict, Optional

from .rp_logger import RunPodLogger
Expand Down Expand Up @@ -61,82 +63,149 @@ def __str__(self) -> str:
# ---------------------------------------------------------------------------- #
# Tracker #
# ---------------------------------------------------------------------------- #
class JobsProgress(set):
"""Track the state of current jobs in progress."""

_instance = None
class JobsProgress:
"""Track the state of current jobs in progress using shared memory."""

_instance: Optional['JobsProgress'] = None
_manager: SyncManager
_shared_data: Any
_lock: Any

def __new__(cls):
if JobsProgress._instance is None:
JobsProgress._instance = set.__new__(cls)
return JobsProgress._instance
if cls._instance is None:
instance = object.__new__(cls)
# Initialize instance variables
instance._manager = Manager()
instance._shared_data = instance._manager.dict()
instance._shared_data['jobs'] = instance._manager.list()
instance._lock = instance._manager.Lock()
cls._instance = instance
return cls._instance

def __init__(self):
# Everything is already initialized in __new__
pass

def __repr__(self) -> str:
return f"<{self.__class__.__name__}>: {self.get_job_list()}"

def clear(self) -> None:
return super().clear()
with self._lock:
self._shared_data['jobs'][:] = []

def add(self, element: Any):
"""
Adds a Job object to the set.
"""
if isinstance(element, str):
job_dict = {'id': element}
elif isinstance(element, dict):
job_dict = element
elif hasattr(element, 'id'):
job_dict = {'id': element.id}
else:
raise TypeError("Only Job objects can be added to JobsProgress.")

If the added element is a string, then `Job(id=element)` is added
with self._lock:
# Check if job already exists
job_list = self._shared_data['jobs']
for existing_job in job_list:
if existing_job['id'] == job_dict['id']:
return # Job already exists

# Add new job
job_list.append(job_dict)
log.debug(f"JobsProgress | Added job: {job_dict['id']}")

def get(self, element: Any) -> Optional[Job]:
"""
Retrieves a Job object from the set.

If the added element is a dict, that `Job(**element)` is added
If the element is a string, searches for Job with that id.
"""
if isinstance(element, str):
element = Job(id=element)

if isinstance(element, dict):
element = Job(**element)

if not isinstance(element, Job):
raise TypeError("Only Job objects can be added to JobsProgress.")
search_id = element
elif isinstance(element, Job):
search_id = element.id
else:
raise TypeError("Only Job objects can be retrieved from JobsProgress.")

return super().add(element)
with self._lock:
for job_dict in self._shared_data['jobs']:
if job_dict['id'] == search_id:
log.debug(f"JobsProgress | Retrieved job: {job_dict['id']}")
return Job(**job_dict)

return None

def remove(self, element: Any):
"""
Removes a Job object from the set.

If the element is a string, then `Job(id=element)` is removed

If the element is a dict, then `Job(**element)` is removed
"""
if isinstance(element, str):
element = Job(id=element)

if isinstance(element, dict):
element = Job(**element)

if not isinstance(element, Job):
job_id = element
elif isinstance(element, dict):
job_id = element.get('id')
elif hasattr(element, 'id'):
job_id = element.id
else:
raise TypeError("Only Job objects can be removed from JobsProgress.")

return super().discard(element)

def get(self, element: Any) -> Job:
if isinstance(element, str):
element = Job(id=element)

if not isinstance(element, Job):
raise TypeError("Only Job objects can be retrieved from JobsProgress.")

for job in self:
if job == element:
return job
with self._lock:
job_list = self._shared_data['jobs']
# Find and remove the job
for i, job_dict in enumerate(job_list):
if job_dict['id'] == job_id:
del job_list[i]
log.debug(f"JobsProgress | Removed job: {job_dict['id']}")
break

def get_job_list(self) -> str:
def get_job_list(self) -> Optional[str]:
"""
Returns the list of job IDs as comma-separated string.
"""
if not len(self):
with self._lock:
job_list = list(self._shared_data['jobs'])

if not job_list:
return None

return ",".join(str(job) for job in self)
log.debug(f"JobsProgress | Jobs in progress: {job_list}")
return ",".join(str(job_dict['id']) for job_dict in job_list)

def get_job_count(self) -> int:
"""
Returns the number of jobs.
"""
return len(self)
with self._lock:
return len(self._shared_data['jobs'])

def __iter__(self):
"""Make the class iterable - returns Job objects"""
with self._lock:
# Create a snapshot of jobs to avoid holding lock during iteration
job_dicts = list(self._shared_data['jobs'])

# Return an iterator of Job objects
return iter(Job(**job_dict) for job_dict in job_dicts)

def __len__(self):
"""Support len() operation"""
return self.get_job_count()

def __contains__(self, element: Any) -> bool:
"""Support 'in' operator"""
if isinstance(element, str):
search_id = element
elif isinstance(element, Job):
search_id = element.id
elif isinstance(element, dict):
search_id = element.get('id')
else:
return False

with self._lock:
for job_dict in self._shared_data['jobs']:
if job_dict['id'] == search_id:
return True
return False
Loading
Loading