Skip to content

Commit e42ea02

Browse files
authored
fix: ping fails when a process is blocking GIL (#425)
* refactor: track JobsProgress state using shared memory * refactor: Heartbeat uses separate process for independent GIL * update: run_scale test supports new JobsProgress * fix: broken tests
1 parent 59d5c46 commit e42ea02

File tree

4 files changed

+473
-261
lines changed

4 files changed

+473
-261
lines changed

runpod/serverless/modules/rp_ping.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
"""
55

66
import os
7-
import threading
87
import time
9-
8+
from multiprocessing import Process
109
import requests
1110
from urllib3.util.retry import Retry
1211

@@ -22,7 +21,7 @@
2221
class Heartbeat:
2322
"""Sends heartbeats to the Runpod server."""
2423

25-
_thread_started = False
24+
_process_started = False
2625

2726
def __init__(self, pool_connections=10, retries=3) -> None:
2827
"""
@@ -32,9 +31,10 @@ def __init__(self, pool_connections=10, retries=3) -> None:
3231
self.PING_URL = self.PING_URL.replace("$RUNPOD_POD_ID", WORKER_ID)
3332
self.PING_INTERVAL = int(os.environ.get("RUNPOD_PING_INTERVAL", 10000)) // 1000
3433

34+
# Create a new HTTP session
3535
self._session = SyncClientSession()
3636
self._session.headers.update(
37-
{"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"}
37+
{"Authorization": os.environ.get("RUNPOD_AI_API_KEY", "")}
3838
)
3939

4040
retry_strategy = Retry(
@@ -52,9 +52,18 @@ def __init__(self, pool_connections=10, retries=3) -> None:
5252
self._session.mount("http://", adapter)
5353
self._session.mount("https://", adapter)
5454

55+
@staticmethod
56+
def process_loop(test=False):
57+
"""
58+
Static helper to run the ping loop in a separate process.
59+
Creates a new Heartbeat instance to avoid pickling issues.
60+
"""
61+
hb = Heartbeat()
62+
hb.ping_loop(test)
63+
5564
def start_ping(self, test=False):
5665
"""
57-
Sends heartbeat pings to the Runpod server.
66+
Sends heartbeat pings to the Runpod server in a separate process.
5867
"""
5968
if not os.environ.get("RUNPOD_AI_API_KEY"):
6069
log.debug("Not deployed on RunPod serverless, pings will not be sent.")
@@ -68,18 +77,19 @@ def start_ping(self, test=False):
6877
log.error("Ping URL not set, cannot start ping.")
6978
return
7079

71-
if not Heartbeat._thread_started:
72-
threading.Thread(target=self.ping_loop, daemon=True, args=(test,)).start()
73-
Heartbeat._thread_started = True
80+
if not Heartbeat._process_started:
81+
process = Process(target=Heartbeat.process_loop, args=(test,))
82+
process.daemon = True
83+
process.start()
84+
Heartbeat._process_started = True
7485

7586
def ping_loop(self, test=False):
7687
"""
77-
Sends heartbeat pings to the Runpod server.
88+
Sends heartbeat pings to the Runpod server until interrupted.
7889
"""
7990
while True:
8091
self._send_ping()
8192
time.sleep(self.PING_INTERVAL)
82-
8393
if test:
8494
return
8595

@@ -98,6 +108,5 @@ def _send_ping(self):
98108
log.debug(
99109
f"Heartbeat Sent | URL: {result.url} | Status: {result.status_code}"
100110
)
101-
102111
except requests.RequestException as err:
103112
log.error(f"Ping Request Error: {err}, attempting to restart ping.")

runpod/serverless/modules/worker_state.py

Lines changed: 113 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import os
66
import time
77
import uuid
8+
from multiprocessing import Manager
9+
from multiprocessing.managers import SyncManager
810
from typing import Any, Dict, Optional
911

1012
from .rp_logger import RunPodLogger
@@ -61,82 +63,149 @@ def __str__(self) -> str:
6163
# ---------------------------------------------------------------------------- #
6264
# Tracker #
6365
# ---------------------------------------------------------------------------- #
64-
class JobsProgress(set):
65-
"""Track the state of current jobs in progress."""
66-
67-
_instance = None
66+
class JobsProgress:
67+
"""Track the state of current jobs in progress using shared memory."""
68+
69+
_instance: Optional['JobsProgress'] = None
70+
_manager: SyncManager
71+
_shared_data: Any
72+
_lock: Any
6873

6974
def __new__(cls):
70-
if JobsProgress._instance is None:
71-
JobsProgress._instance = set.__new__(cls)
72-
return JobsProgress._instance
75+
if cls._instance is None:
76+
instance = object.__new__(cls)
77+
# Initialize instance variables
78+
instance._manager = Manager()
79+
instance._shared_data = instance._manager.dict()
80+
instance._shared_data['jobs'] = instance._manager.list()
81+
instance._lock = instance._manager.Lock()
82+
cls._instance = instance
83+
return cls._instance
84+
85+
def __init__(self):
86+
# Everything is already initialized in __new__
87+
pass
7388

7489
def __repr__(self) -> str:
7590
return f"<{self.__class__.__name__}>: {self.get_job_list()}"
7691

7792
def clear(self) -> None:
78-
return super().clear()
93+
with self._lock:
94+
self._shared_data['jobs'][:] = []
7995

8096
def add(self, element: Any):
8197
"""
8298
Adds a Job object to the set.
99+
"""
100+
if isinstance(element, str):
101+
job_dict = {'id': element}
102+
elif isinstance(element, dict):
103+
job_dict = element
104+
elif hasattr(element, 'id'):
105+
job_dict = {'id': element.id}
106+
else:
107+
raise TypeError("Only Job objects can be added to JobsProgress.")
83108

84-
If the added element is a string, then `Job(id=element)` is added
109+
with self._lock:
110+
# Check if job already exists
111+
job_list = self._shared_data['jobs']
112+
for existing_job in job_list:
113+
if existing_job['id'] == job_dict['id']:
114+
return # Job already exists
115+
116+
# Add new job
117+
job_list.append(job_dict)
118+
log.debug(f"JobsProgress | Added job: {job_dict['id']}")
119+
120+
def get(self, element: Any) -> Optional[Job]:
121+
"""
122+
Retrieves a Job object from the set.
85123
86-
If the added element is a dict, that `Job(**element)` is added
124+
If the element is a string, searches for Job with that id.
87125
"""
88126
if isinstance(element, str):
89-
element = Job(id=element)
90-
91-
if isinstance(element, dict):
92-
element = Job(**element)
93-
94-
if not isinstance(element, Job):
95-
raise TypeError("Only Job objects can be added to JobsProgress.")
127+
search_id = element
128+
elif isinstance(element, Job):
129+
search_id = element.id
130+
else:
131+
raise TypeError("Only Job objects can be retrieved from JobsProgress.")
96132

97-
return super().add(element)
133+
with self._lock:
134+
for job_dict in self._shared_data['jobs']:
135+
if job_dict['id'] == search_id:
136+
log.debug(f"JobsProgress | Retrieved job: {job_dict['id']}")
137+
return Job(**job_dict)
138+
139+
return None
98140

99141
def remove(self, element: Any):
100142
"""
101143
Removes a Job object from the set.
102-
103-
If the element is a string, then `Job(id=element)` is removed
104-
105-
If the element is a dict, then `Job(**element)` is removed
106144
"""
107145
if isinstance(element, str):
108-
element = Job(id=element)
109-
110-
if isinstance(element, dict):
111-
element = Job(**element)
112-
113-
if not isinstance(element, Job):
146+
job_id = element
147+
elif isinstance(element, dict):
148+
job_id = element.get('id')
149+
elif hasattr(element, 'id'):
150+
job_id = element.id
151+
else:
114152
raise TypeError("Only Job objects can be removed from JobsProgress.")
115153

116-
return super().discard(element)
117-
118-
def get(self, element: Any) -> Job:
119-
if isinstance(element, str):
120-
element = Job(id=element)
121-
122-
if not isinstance(element, Job):
123-
raise TypeError("Only Job objects can be retrieved from JobsProgress.")
124-
125-
for job in self:
126-
if job == element:
127-
return job
154+
with self._lock:
155+
job_list = self._shared_data['jobs']
156+
# Find and remove the job
157+
for i, job_dict in enumerate(job_list):
158+
if job_dict['id'] == job_id:
159+
del job_list[i]
160+
log.debug(f"JobsProgress | Removed job: {job_dict['id']}")
161+
break
128162

129-
def get_job_list(self) -> str:
163+
def get_job_list(self) -> Optional[str]:
130164
"""
131165
Returns the list of job IDs as comma-separated string.
132166
"""
133-
if not len(self):
167+
with self._lock:
168+
job_list = list(self._shared_data['jobs'])
169+
170+
if not job_list:
134171
return None
135172

136-
return ",".join(str(job) for job in self)
173+
log.debug(f"JobsProgress | Jobs in progress: {job_list}")
174+
return ",".join(str(job_dict['id']) for job_dict in job_list)
137175

138176
def get_job_count(self) -> int:
139177
"""
140178
Returns the number of jobs.
141179
"""
142-
return len(self)
180+
with self._lock:
181+
return len(self._shared_data['jobs'])
182+
183+
def __iter__(self):
184+
"""Make the class iterable - returns Job objects"""
185+
with self._lock:
186+
# Create a snapshot of jobs to avoid holding lock during iteration
187+
job_dicts = list(self._shared_data['jobs'])
188+
189+
# Return an iterator of Job objects
190+
return iter(Job(**job_dict) for job_dict in job_dicts)
191+
192+
def __len__(self):
193+
"""Support len() operation"""
194+
return self.get_job_count()
195+
196+
def __contains__(self, element: Any) -> bool:
197+
"""Support 'in' operator"""
198+
if isinstance(element, str):
199+
search_id = element
200+
elif isinstance(element, Job):
201+
search_id = element.id
202+
elif isinstance(element, dict):
203+
search_id = element.get('id')
204+
else:
205+
return False
206+
207+
with self._lock:
208+
for job_dict in self._shared_data['jobs']:
209+
if job_dict['id'] == search_id:
210+
return True
211+
return False

0 commit comments

Comments
 (0)