Skip to content
This repository has been archived by the owner on Feb 11, 2023. It is now read-only.

Commit

Permalink
ENH: separate Worker into QtWorker and ThreadWorker, see #35
Browse files Browse the repository at this point in the history
  • Loading branch information
leloup314 committed Jul 17, 2020
1 parent ae87984 commit b8a2db3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 61 deletions.
4 changes: 2 additions & 2 deletions irrad_control/gui/tabs/setup_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from irrad_control import network_config, daq_config, config_path
from irrad_control.devices.adc import ads1256
from irrad_control.utils.logger import log_levels
from irrad_control.utils.worker import Worker
from irrad_control.utils.worker import QtWorker
from irrad_control.gui.widgets import GridContainer, NoBackgroundScrollArea
from collections import OrderedDict
from copy import deepcopy
Expand Down Expand Up @@ -452,7 +452,7 @@ def _add_to_known_servers(self, ip):
def find_servers(self):

self.label_status.setText("Finding server(s)...")
self.threadpool.start(Worker(func=self._find_available_servers))
self.threadpool.start(QtWorker(func=self._find_available_servers))

def _find_available_servers(self, timeout=10):

Expand Down
14 changes: 7 additions & 7 deletions irrad_control/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

# Package imports
from irrad_control.utils.logger import CustomHandler, LoggingStream, log_levels
from irrad_control.utils.worker import Worker
from irrad_control.utils.worker import QtWorker
from irrad_control.utils.proc_manager import ProcessManager
from irrad_control.gui.widgets import DaqInfoWidget, LoggingWidget
from irrad_control.gui.tabs import IrradSetupTab, IrradControlTab, IrradMonitorTab
Expand Down Expand Up @@ -255,10 +255,10 @@ def handle_log(self, log_dict):
def _init_recv_threads(self):

# Start receiving log messages from other processes
self.threadpool.start(Worker(func=self.recv_log))
self.threadpool.start(QtWorker(func=self.recv_log))

# Start receiving data from other processes
self.threadpool.start(Worker(func=self.recv_data))
self.threadpool.start(QtWorker(func=self.recv_data))

def _init_processes(self):

Expand All @@ -269,7 +269,7 @@ def _init_processes(self):
self.proc_mngr.connect_to_server(hostname=server, username='pi')

# Prepare server in QThread on init
server_config_workers[server] = Worker(func=self.proc_mngr.configure_server, hostname=server, branch='development', git_pull=True)
server_config_workers[server] = QtWorker(func=self.proc_mngr.configure_server, hostname=server, branch='development', git_pull=True)

# Connect workers finish signal to starting process on server
server_config_workers[server].signals.finished.connect(lambda _server=server: self.start_server(_server))
Expand Down Expand Up @@ -333,7 +333,7 @@ def _start_daq_proc(self, hostname, ignore_orphaned=False):

# All servers have been launched; start collecting info
if all(server in self.proc_mngr.launched_procs for server in self.setup['server']):
proc_info_worker = Worker(func=self.collect_proc_infos)
proc_info_worker = QtWorker(func=self.collect_proc_infos)
proc_info_worker.signals.finished.connect(self._init_recv_threads)
proc_info_worker.signals.finished.connect(self.send_start_cmd)
self.threadpool.start(proc_info_worker)
Expand Down Expand Up @@ -370,7 +370,7 @@ def start_interpreter(self):
self._start_daq_proc(hostname='localhost')

def _connect_worker_exception(self, worker):
worker.signals.exceptionSignal.connect(lambda e, trace: logging.error("{} on sub-thread: {}".format(type(e).__name__, trace)))
worker.signals.exception.connect(lambda e, trace: logging.error("{} on sub-thread: {}".format(type(e).__name__, trace)))

def _connect_worker_close(self, worker, hostname):
self._cmd_reply[hostname].append(self._cmd_id)
Expand Down Expand Up @@ -479,7 +479,7 @@ def send_cmd(self, hostname, target, cmd, cmd_data=None, check_reply=True):
return

cmd_dict = {'target': target, 'cmd': cmd, 'data': cmd_data}
cmd_worker = Worker(self._send_cmd_get_reply, hostname, cmd_dict)
cmd_worker = QtWorker(self._send_cmd_get_reply, hostname, cmd_dict)

# Make connections
self._connect_worker_exception(worker=cmd_worker)
Expand Down
144 changes: 92 additions & 52 deletions irrad_control/utils/worker.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,107 @@
import traceback
from PyQt5 import QtCore
import threading

# Py2/3 compatibility
try:
ModuleNotFoundError
except NameError:
ModuleNotFoundError = ImportError

class WorkerSignals(QtCore.QObject):

finished = QtCore.pyqtSignal()
exceptionSignal = QtCore.pyqtSignal(Exception, str)
timeout = QtCore.pyqtSignal()
# If we can import PyQt5, we want a QtWorker as well as a ThreadWorker
_QT_WORKER = True
try:
from PyQt5 import QtCore
except ModuleNotFoundError:
_QT_WORKER = False

if _QT_WORKER:

class QtWorkerSignals(QtCore.QObject):

finished = QtCore.pyqtSignal()
exception = QtCore.pyqtSignal(Exception, str)
timeout = QtCore.pyqtSignal()


class QtWorker(QtCore.QRunnable):
"""
Implements a worker on which functions can be executed for multi-threading within Qt.
The worker is an instance of QRunnable, which can be started and handled automatically by Qt and its QThreadPool.
"""

def __init__(self, func, *args, **kwargs):
super(QtWorker, self).__init__()

# Main function which will be executed on this thread
self.func = func
# Arguments of main function
self.args = args
# Keyword arguments of main function
self.kwargs = kwargs

# Needs to be done this way since QRunnable cannot emit signals; QObject needed
self.signals = QtWorkerSignals()

# Timer to inform that a timeout occurred
self.timer = QtCore.QTimer()
self.timer.setSingleShot(True)
self.timeout = None

def set_timeout(self, timeout):
self.timeout = int(timeout)

class Worker(QtCore.QRunnable):
@QtCore.pyqtSlot()
def run(self):
"""
Runs the function func with given arguments args and keyword arguments kwargs.
If errors or exceptions occur, a signal sends the exception to main thread.
"""

# Start timer if needed
if self.timeout is not None:
self.timer.timeout.connect(self.signals.timeout.emit())
self.timer.start(self.timeout)

try:
if self.args and self.kwargs:
self.func(*self.args, **self.kwargs)
elif self.args:
self.func(*self.args)
elif self.kwargs:
self.func(**self.kwargs)
else:
self.func()

except Exception as e:
# Format traceback and send
trc_bck = traceback.format_exc()
# Emit exception signal
self.signals.exception.emit(e, trc_bck)

self.signals.finished.emit()


class ThreadWorker(threading.Thread):
"""
Implements a worker on which functions can be executed for multi-threading within Qt.
The worker is an instance of QRunnable, which can be started and handled automatically by Qt and its QThreadPool.
Sub-class of threading.Thread which stores any exception which occurs during the Thread's 'run'-method.
"""

def __init__(self, *args, **kwargs):

def __init__(self, func, *args, **kwargs):
super(Worker, self).__init__()

# Main function which will be executed on this thread
self.func = func
# Arguments of main function
self.args = args
# Keyword arguments of main function
self.kwargs = kwargs

# Needs to be done this way since QRunnable cannot emit signals; QObject needed
self.signals = WorkerSignals()
# Name thread according to function which is executed
if 'name' not in kwargs:
if 'target' in kwargs and kwargs['target'] is not None:
kwargs['name'] = kwargs['target'].__name__

# Timer to inform that a timeout occurred
self.timer = QtCore.QTimer()
self.timer.setSingleShot(True)
self.timeout = None
super(ThreadWorker, self).__init__(*args, **kwargs)

def set_timeout(self, timeout):
self.timeout = int(timeout)
# Init attributes holding exception and formatted traceback string
self.exception = self.traceback_str = None

@QtCore.pyqtSlot()
def run(self):
"""
Runs the function func with given arguments args and keyword arguments kwargs.
If errors or exceptions occur, a signal sends the exception to main thread.
"""

# Start timer if needed
if self.timeout is not None:
self.timer.timeout.connect(self.signals.timeout.emit())
self.timer.start(self.timeout)
"""Wraps original run method to store exceptions and traceback"""

try:
if self.args and self.kwargs:
self.func(*self.args, **self.kwargs)
elif self.args:
self.func(*self.args)
elif self.kwargs:
self.func(**self.kwargs)
else:
self.func()

super(ThreadWorker, self).run()
except Exception as e:
# Format traceback and send
trc_bck = traceback.format_exc()
# Emit exception signal
self.signals.exceptionSignal.emit(e, trc_bck)

self.signals.finished.emit()
self.exception, self.traceback_str = e, traceback.format_exc()

0 comments on commit b8a2db3

Please sign in to comment.