From 112ca66da0ee8156b983094b2c8e2926ed63cfcb Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 12 Jun 2023 22:13:12 +0200 Subject: [PATCH] Avoid ResourceWarning on implicitly closed event pipe sockets (#1125) Co-authored-by: Steven Silvester Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- ipykernel/iostream.py | 50 ++++++++++++++++++++++++++++++++------ ipykernel/tests/test_io.py | 35 ++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 8b5e47b3..a1e13845 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import io import os @@ -14,8 +15,7 @@ from collections import deque from io import StringIO, TextIOBase from threading import local -from typing import Any, Callable, Deque, Optional -from weakref import WeakSet +from typing import Any, Callable, Deque, Dict, Optional import zmq from jupyter_client.session import extract_header @@ -63,7 +63,10 @@ def __init__(self, socket, pipe=False): self._setup_pipe_in() self._local = threading.local() self._events: Deque[Callable[..., Any]] = deque() - self._event_pipes: WeakSet[Any] = WeakSet() + self._event_pipes: Dict[threading.Thread, Any] = {} + self._event_pipe_gc_lock: threading.Lock = threading.Lock() + self._event_pipe_gc_seconds: float = 10 + self._event_pipe_gc_task: Optional[asyncio.Task] = None self._setup_event_pipe() self.thread = threading.Thread(target=self._thread_main, name="IOPub") self.thread.daemon = True @@ -73,7 +76,18 @@ def __init__(self, socket, pipe=False): def _thread_main(self): """The inner loop that's actually run in a thread""" + + def _start_event_gc(): + self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc()) + + self.io_loop.run_sync(_start_event_gc) self.io_loop.start() + if self._event_pipe_gc_task is not None: + # cancel gc task to avoid pending task warnings + async def _cancel(): + self._event_pipe_gc_task.cancel() # type:ignore + + self.io_loop.run_sync(_cancel) self.io_loop.close(all_fds=True) def _setup_event_pipe(self): @@ -88,6 +102,26 @@ def _setup_event_pipe(self): self._event_puller = ZMQStream(pipe_in, self.io_loop) self._event_puller.on_recv(self._handle_event) + async def _run_event_pipe_gc(self): + """Task to run event pipe gc continuously""" + while True: + await asyncio.sleep(self._event_pipe_gc_seconds) + try: + await self._event_pipe_gc() + except Exception as e: + print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__) + + async def _event_pipe_gc(self): + """run a single garbage collection on event pipes""" + if not self._event_pipes: + # don't acquire the lock if there's nothing to do + return + with self._event_pipe_gc_lock: + for thread, socket in list(self._event_pipes.items()): + if not thread.is_alive(): + socket.close() + del self._event_pipes[thread] + @property def _event_pipe(self): """thread-local event pipe for signaling events that should be processed in the thread""" @@ -100,9 +134,11 @@ def _event_pipe(self): event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe - # WeakSet so that event pipes will be closed by garbage collection - # when their threads are terminated - self._event_pipes.add(event_pipe) + # associate event pipes to their threads + # so they can be closed explicitly + # implicit close on __del__ throws a ResourceWarning + with self._event_pipe_gc_lock: + self._event_pipes[threading.current_thread()] = event_pipe return event_pipe def _handle_event(self, msg): @@ -188,7 +224,7 @@ def stop(self): # close *all* event pipes, created in any thread # event pipes can only be used from other threads while self.thread.is_alive() # so after thread.join, this should be safe - for event_pipe in self._event_pipes: + for _thread, event_pipe in self._event_pipes.items(): event_pipe.close() def close(self): diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 6a9f6517..404657cb 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -4,8 +4,10 @@ import os import subprocess import sys +import threading import time import warnings +from concurrent.futures import Future, ThreadPoolExecutor from unittest import mock import pytest @@ -114,6 +116,39 @@ def test_outstream(iopub_thread): assert stream.writable() +async def test_event_pipe_gc(iopub_thread): + session = Session(key=b'abc') + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + watchfd=False, + ) + save_stdout = sys.stdout + assert iopub_thread._event_pipes == {} + with stream, mock.patch.object(sys, "stdout", stream), ThreadPoolExecutor(1) as pool: + pool.submit(print, "x").result() + pool_thread = pool.submit(threading.current_thread).result() + assert list(iopub_thread._event_pipes) == [pool_thread] + + # run gc once in the iopub thread + f: Future = Future() + + async def run_gc(): + try: + await iopub_thread._event_pipe_gc() + except Exception as e: + f.set_exception(e) + else: + f.set_result(None) + + iopub_thread.io_loop.add_callback(run_gc) + # wait for call to finish in iopub thread + f.result() + assert iopub_thread._event_pipes == {} + + def subprocess_test_echo_watch(): # handshake Pub subscription session = Session(key=b'abc')