Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,5 @@ dmypy.json
.pyre/
runpod/_version.py
.runpod_jobs.pkl

*.lock
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ click >= 8.1.7
colorama >= 0.2.5, < 0.4.7
cryptography < 46.0.0
fastapi[all] >= 0.94.0
filelock >= 3.0.0
paramiko >= 3.3.1
prettytable >= 3.9.0
py-cpuinfo >= 9.0.0
Expand Down
56 changes: 26 additions & 30 deletions runpod/serverless/modules/worker_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import time
import uuid
import pickle
import fcntl
import tempfile
from typing import Any, Dict, Optional, Set

from filelock import FileLock

from .rp_logger import RunPodLogger


Expand Down Expand Up @@ -95,25 +96,23 @@ def _load_state(self):
os.path.exists(self._STATE_FILE)
and os.path.getsize(self._STATE_FILE) > 0
):
with open(self._STATE_FILE, "rb") as f:
fcntl.flock(f, fcntl.LOCK_SH)
try:
loaded_jobs = pickle.load(f)
# Clear current state and add loaded jobs
super().clear()
for job in loaded_jobs:
set.add(
self, job
) # Use set.add to avoid triggering _save_state

except (EOFError, pickle.UnpicklingError):
# Handle empty or corrupted file
log.debug(
"JobsProgress: Failed to load state file, starting with empty state"
)
pass
finally:
fcntl.flock(f, fcntl.LOCK_UN)
with FileLock(self._STATE_FILE + '.lock'):
with open(self._STATE_FILE, "rb") as f:
try:
loaded_jobs = pickle.load(f)
# Clear current state and add loaded jobs
super().clear()
for job in loaded_jobs:
set.add(
self, job
) # Use set.add to avoid triggering _save_state

except (EOFError, pickle.UnpicklingError):
# Handle empty or corrupted file
log.debug(
"JobsProgress: Failed to load state file, starting with empty state"
)
pass

except FileNotFoundError:
log.debug("JobsProgress: No state file found, starting with empty state")
Expand All @@ -123,17 +122,14 @@ def _save_state(self):
"""Save jobs state to pickle file with atomic write and file locking."""
try:
# Use temporary file for atomic write
with tempfile.NamedTemporaryFile(
dir=self._STATE_DIR, delete=False, mode="wb"
) as temp_f:
fcntl.flock(temp_f, fcntl.LOCK_EX)
try:
with FileLock(self._STATE_FILE + '.lock'):
with tempfile.NamedTemporaryFile(
dir=self._STATE_DIR, delete=False, mode="wb"
) as temp_f:
pickle.dump(set(self), temp_f)
finally:
fcntl.flock(temp_f, fcntl.LOCK_UN)

# Atomically replace the state file
os.replace(temp_f.name, self._STATE_FILE)

# Atomically replace the state file
os.replace(temp_f.name, self._STATE_FILE)
except Exception as e:
log.error(f"Failed to save job state: {e}")

Expand Down
Loading