Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raise RuntimeError("IOLoop is closed") during script shutdown #420

Open
afsangujarati93 opened this issue Dec 24, 2023 · 1 comment
Open

Comments

@afsangujarati93
Copy link

afsangujarati93 commented Dec 24, 2023

Describe the issue:
With no success online, I tried to reduce my code to find exactly where the issue is coming from and this is the snippet that I am left with that's causing this error

raise RuntimeError("IOLoop is closed")

All my function is doing is setting up a cluster in ECS and tearing it down. The cluster successfully spins up in AWS ECS. The error occurs at the very end when the notebook is shutting down. Stack trace and other details in the thread. (edited)

Minimal Complete Verifiable Example:

from dotenv import load_dotenv
import os
import logging
from dask_cloudprovider.aws import ECSCluster
from dask.distributed import Client

load_dotenv(verbose=True, override=True)

AWS_ACCOUNT_ID=os.environ["AWS_ACCOUNT_ID"]
AWS_DEFAULT_REGION=os.environ["AWS_DEFAULT_REGION"]
VPC_ID = os.environ["VPC_ID"]
SUBNET_ID = os.environ["SUBNET_ID"]
SECURITY_GROUP = os.environ["SECURITY_GROUP"]

logging.basicConfig(level=logging.INFO)

logging.info("Initializing clusters ...")
with ECSCluster(
    cluster_name_template="dask-datascience",
    fargate_scheduler=True,
    fargate_workers=True,
    fargate_spot=True,
    vpc=VPC_ID,
    subnets=[SUBNET_ID],
    security_groups=[SECURITY_GROUP],
    worker_cpu=512,
    worker_mem=1024,
    n_workers=1
) as cluster:
    with Client(cluster) as client:
        logging.info("Going ....")

Anything else we need to know?:

Stacktrace

INFO:root:Going ....
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 667, in _exitfunc
    f()
  File "/Users/afsan.gujarati/.pyenv/versions/3.10.12/lib/python3.10/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/afsan.gujarati/.pyenv/versions/dask-test/lib/python3.10/site-packages/distributed/utils.py", line 387, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed

Package versions:

distributed==2023.12.1
dask==2023.12.1
dask_cloudprovider==2022.10.0
tornado==6.3.3

Environment:

  • Dask version: 2023.12.1
  • Python version: 3.10.12
  • Operating System: Mac OS Sonoma 14.1
  • Install method (conda, pip, source): pip
@afsangujarati93
Copy link
Author

I tried using asynchronous with the following code

from dotenv import load_dotenv
import os
import logging
from dask_cloudprovider.aws import ECSCluster
from dask.distributed import Client

load_dotenv(verbose=True, override=True)

AWS_ACCOUNT_ID=os.environ["AWS_ACCOUNT_ID"]
AWS_DEFAULT_REGION=os.environ["AWS_DEFAULT_REGION"]
VPC_ID = os.environ["VPC_ID"]
SUBNET_ID = os.environ["SUBNET_ID"]
SECURITY_GROUP = os.environ["SECURITY_GROUP"]

logging.basicConfig(level=logging.INFO)

async def f():
    logging.info("Initializing clusters ...")
    cluster = await ECSCluster(
        cluster_name_template="dask-datascience",
        fargate_scheduler=True,
        fargate_workers=True,
        fargate_spot=True,
        vpc=VPC_ID,
        subnets=[SUBNET_ID],
        security_groups=[SECURITY_GROUP],
        worker_cpu=512,
        worker_mem=1024,
        n_workers=1,
        asynchronous=True
    )
    logging.info("in cluster ...")
    client = await Client(cluster, asynchronous=True)
    logging.info("initialized client ...")
    await client.close()
    logging.info("closed client ...")
    await cluster.close()
    logging.info("closed cluster ...")
    
    return None
  

# Or use asyncio
import asyncio

loop = asyncio.get_event_loop()
loop.run_until_complete(f())
logging.info("After complete ...")

import threading

# After your asyncio loop
for thread in threading.enumerate():
    print(f"Thread: {thread.name}, Daemon: {thread.isDaemon()}")

logging.info("Before exit ...")

os._exit(0) #Added this because the script won't exit 

In this situation, after looking at the logs, I noticed that the f() was getting executed successfully but the code wasn't exiting. When I looked at the output of print(f"Thread: {thread.name}, Daemon: {thread.isDaemon()}") I noticed the following

Thread: MainThread, Daemon: False
Thread: asyncio_0, Daemon: False

There was another asyncio_0 thread that was probably spun up somewhere in Dask or dask_provider that wasn't closed. As a temporary measure, I explicitly added os._exit(0) to exit the script. I know this isn't the ideal solution but I didn't know what else to look for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant