From 2c203e8d6fbbeeff3a8ba1f679635b94a51f4938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Fri, 27 Jun 2025 08:30:27 -0700 Subject: [PATCH] fix: No module named 'fcntl' --- .gitignore | 2 + requirements.txt | 1 + runpod/serverless/modules/worker_state.py | 56 +++++++++++------------ 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 2557451c..1ccfa853 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,5 @@ dmypy.json .pyre/ runpod/_version.py .runpod_jobs.pkl + +*.lock diff --git a/requirements.txt b/requirements.txt index 60035ec1..fc8a46a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ click >= 8.1.7 colorama >= 0.2.5, < 0.4.7 cryptography < 46.0.0 fastapi[all] >= 0.94.0 +filelock >= 3.0.0 paramiko >= 3.3.1 prettytable >= 3.9.0 py-cpuinfo >= 9.0.0 diff --git a/runpod/serverless/modules/worker_state.py b/runpod/serverless/modules/worker_state.py index 2dad0a07..b546ce02 100644 --- a/runpod/serverless/modules/worker_state.py +++ b/runpod/serverless/modules/worker_state.py @@ -6,10 +6,11 @@ import time import uuid import pickle -import fcntl import tempfile from typing import Any, Dict, Optional, Set +from filelock import FileLock + from .rp_logger import RunPodLogger @@ -95,25 +96,23 @@ def _load_state(self): os.path.exists(self._STATE_FILE) and os.path.getsize(self._STATE_FILE) > 0 ): - with open(self._STATE_FILE, "rb") as f: - fcntl.flock(f, fcntl.LOCK_SH) - try: - loaded_jobs = pickle.load(f) - # Clear current state and add loaded jobs - super().clear() - for job in loaded_jobs: - set.add( - self, job - ) # Use set.add to avoid triggering _save_state - - except (EOFError, pickle.UnpicklingError): - # Handle empty or corrupted file - log.debug( - "JobsProgress: Failed to load state file, starting with empty state" - ) - pass - finally: - fcntl.flock(f, fcntl.LOCK_UN) + with FileLock(self._STATE_FILE + '.lock'): + with open(self._STATE_FILE, "rb") as f: + try: + loaded_jobs = pickle.load(f) + # Clear current state and add loaded jobs + super().clear() + for job in loaded_jobs: + set.add( + self, job + ) # Use set.add to avoid triggering _save_state + + except (EOFError, pickle.UnpicklingError): + # Handle empty or corrupted file + log.debug( + "JobsProgress: Failed to load state file, starting with empty state" + ) + pass except FileNotFoundError: log.debug("JobsProgress: No state file found, starting with empty state") @@ -123,17 +122,14 @@ def _save_state(self): """Save jobs state to pickle file with atomic write and file locking.""" try: # Use temporary file for atomic write - with tempfile.NamedTemporaryFile( - dir=self._STATE_DIR, delete=False, mode="wb" - ) as temp_f: - fcntl.flock(temp_f, fcntl.LOCK_EX) - try: + with FileLock(self._STATE_FILE + '.lock'): + with tempfile.NamedTemporaryFile( + dir=self._STATE_DIR, delete=False, mode="wb" + ) as temp_f: pickle.dump(set(self), temp_f) - finally: - fcntl.flock(temp_f, fcntl.LOCK_UN) - - # Atomically replace the state file - os.replace(temp_f.name, self._STATE_FILE) + + # Atomically replace the state file + os.replace(temp_f.name, self._STATE_FILE) except Exception as e: log.error(f"Failed to save job state: {e}")