Skip to content

Commit

Permalink
Avoid ResourceWarning on implicitly closed event pipe sockets (#1125)
Browse files Browse the repository at this point in the history
Co-authored-by: Steven Silvester <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 12, 2023
1 parent 1c7f626 commit 112ca66
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 7 deletions.
50 changes: 43 additions & 7 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
import io
import os
Expand All @@ -14,8 +15,7 @@
from collections import deque
from io import StringIO, TextIOBase
from threading import local
from typing import Any, Callable, Deque, Optional
from weakref import WeakSet
from typing import Any, Callable, Deque, Dict, Optional

import zmq
from jupyter_client.session import extract_header
Expand Down Expand Up @@ -63,7 +63,10 @@ def __init__(self, socket, pipe=False):
self._setup_pipe_in()
self._local = threading.local()
self._events: Deque[Callable[..., Any]] = deque()
self._event_pipes: WeakSet[Any] = WeakSet()
self._event_pipes: Dict[threading.Thread, Any] = {}
self._event_pipe_gc_lock: threading.Lock = threading.Lock()
self._event_pipe_gc_seconds: float = 10
self._event_pipe_gc_task: Optional[asyncio.Task] = None
self._setup_event_pipe()
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
Expand All @@ -73,7 +76,18 @@ def __init__(self, socket, pipe=False):

def _thread_main(self):
"""The inner loop that's actually run in a thread"""

def _start_event_gc():
self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc())

self.io_loop.run_sync(_start_event_gc)
self.io_loop.start()
if self._event_pipe_gc_task is not None:
# cancel gc task to avoid pending task warnings
async def _cancel():
self._event_pipe_gc_task.cancel() # type:ignore

self.io_loop.run_sync(_cancel)
self.io_loop.close(all_fds=True)

def _setup_event_pipe(self):
Expand All @@ -88,6 +102,26 @@ def _setup_event_pipe(self):
self._event_puller = ZMQStream(pipe_in, self.io_loop)
self._event_puller.on_recv(self._handle_event)

async def _run_event_pipe_gc(self):
"""Task to run event pipe gc continuously"""
while True:
await asyncio.sleep(self._event_pipe_gc_seconds)
try:
await self._event_pipe_gc()
except Exception as e:
print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__)

async def _event_pipe_gc(self):
"""run a single garbage collection on event pipes"""
if not self._event_pipes:
# don't acquire the lock if there's nothing to do
return
with self._event_pipe_gc_lock:
for thread, socket in list(self._event_pipes.items()):
if not thread.is_alive():
socket.close()
del self._event_pipes[thread]

@property
def _event_pipe(self):
"""thread-local event pipe for signaling events that should be processed in the thread"""
Expand All @@ -100,9 +134,11 @@ def _event_pipe(self):
event_pipe.linger = 0
event_pipe.connect(self._event_interface)
self._local.event_pipe = event_pipe
# WeakSet so that event pipes will be closed by garbage collection
# when their threads are terminated
self._event_pipes.add(event_pipe)
# associate event pipes to their threads
# so they can be closed explicitly
# implicit close on __del__ throws a ResourceWarning
with self._event_pipe_gc_lock:
self._event_pipes[threading.current_thread()] = event_pipe
return event_pipe

def _handle_event(self, msg):
Expand Down Expand Up @@ -188,7 +224,7 @@ def stop(self):
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
for event_pipe in self._event_pipes:
for _thread, event_pipe in self._event_pipes.items():
event_pipe.close()

def close(self):
Expand Down
35 changes: 35 additions & 0 deletions ipykernel/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import os
import subprocess
import sys
import threading
import time
import warnings
from concurrent.futures import Future, ThreadPoolExecutor
from unittest import mock

import pytest
Expand Down Expand Up @@ -114,6 +116,39 @@ def test_outstream(iopub_thread):
assert stream.writable()


async def test_event_pipe_gc(iopub_thread):
session = Session(key=b'abc')
stream = OutStream(
session,
iopub_thread,
"stdout",
isatty=True,
watchfd=False,
)
save_stdout = sys.stdout
assert iopub_thread._event_pipes == {}
with stream, mock.patch.object(sys, "stdout", stream), ThreadPoolExecutor(1) as pool:
pool.submit(print, "x").result()
pool_thread = pool.submit(threading.current_thread).result()
assert list(iopub_thread._event_pipes) == [pool_thread]

# run gc once in the iopub thread
f: Future = Future()

async def run_gc():
try:
await iopub_thread._event_pipe_gc()
except Exception as e:
f.set_exception(e)
else:
f.set_result(None)

iopub_thread.io_loop.add_callback(run_gc)
# wait for call to finish in iopub thread
f.result()
assert iopub_thread._event_pipes == {}


def subprocess_test_echo_watch():
# handshake Pub subscription
session = Session(key=b'abc')
Expand Down

0 comments on commit 112ca66

Please sign in to comment.