diff --git a/runpod/serverless/modules/rp_scale.py b/runpod/serverless/modules/rp_scale.py index 77d71f14..da4b0fd0 100644 --- a/runpod/serverless/modules/rp_scale.py +++ b/runpod/serverless/modules/rp_scale.py @@ -82,7 +82,6 @@ def handle_shutdown(self, signum, frame): async def run(self): # Create an async session that will be closed when the worker is killed. - async with AsyncClientSession() as session: # Create tasks for getting and running jobs. jobtake_task = asyncio.create_task(self.get_jobs(session)) @@ -90,26 +89,8 @@ async def run(self): tasks = [jobtake_task, jobrun_task] - try: - # Concurrently run both tasks and wait for both to finish. - await asyncio.gather(*tasks) - except asyncio.CancelledError: # worker is killed - log.debug("Worker tasks cancelled.") - self.kill_worker() - finally: - # Handle the task cancellation gracefully - for task in tasks: - if not task.done(): - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - await self.cleanup() # Ensure resources are cleaned up - - async def cleanup(self): - # Perform any necessary cleanup here, such as closing connections - log.debug("Cleaning up resources before shutdown.") - # TODO: stop heartbeat or close any open connections - await asyncio.sleep(0) # Give a chance for other tasks to run (optional) - log.debug("Cleanup complete.") + # Concurrently run both tasks and wait for both to finish. + await asyncio.gather(*tasks) def is_alive(self): """ @@ -121,6 +102,7 @@ def kill_worker(self): """ Whether to kill the worker. """ + log.info("Kill worker.") self._shutdown_event.set() async def get_jobs(self, session: ClientSession): @@ -142,7 +124,7 @@ async def get_jobs(self, session: ClientSession): jobs_needed = self.current_concurrency - job_progress.get_job_count() if jobs_needed <= 0: log.debug("JobScaler.get_jobs | Queue is full. Retrying soon.") - await asyncio.sleep(0.1) # don't go rapidly + await asyncio.sleep(1) # don't go rapidly continue try: @@ -150,34 +132,32 @@ async def get_jobs(self, session: ClientSession): acquired_jobs = await asyncio.wait_for( get_job(session, jobs_needed), timeout=30 ) + + if not acquired_jobs: + log.debug("JobScaler.get_jobs | No jobs acquired.") + continue + + for job in acquired_jobs: + await job_list.add_job(job) + + log.info(f"Jobs in queue: {job_list.get_job_count()}") + except TooManyRequests: log.debug(f"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds.") await asyncio.sleep(5) # debounce for 5 seconds - continue except asyncio.CancelledError: log.debug("JobScaler.get_jobs | Request was cancelled.") - continue except TimeoutError: log.debug("JobScaler.get_jobs | Job acquisition timed out. Retrying.") - continue except TypeError as error: log.debug(f"JobScaler.get_jobs | Unexpected error: {error}.") - continue except Exception as error: log.error( f"Failed to get job. | Error Type: {type(error).__name__} | Error Message: {str(error)}" ) - continue - - if not acquired_jobs: - log.debug("JobScaler.get_jobs | No jobs acquired.") + finally: + # Yield control back to the event loop await asyncio.sleep(0) - continue - - for job in acquired_jobs: - await job_list.add_job(job) - - log.info(f"Jobs in queue: {job_list.get_job_count()}") async def run_jobs(self, session: ClientSession): """