Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor queue resume/stop/start #786

Merged
merged 4 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ def on_queue_paused(self) -> None:
def resume_queues(self) -> None:
self.api_job_queue.resume_queues()

# clear error status in case queue was paused resulting in a permanent error message with
# retry link
self.gui.clear_error_status()

def completed_api_call(self, thread_id, user_callback):
"""
Manage a completed API call. The actual result *may* be an exception or
Expand Down Expand Up @@ -373,10 +377,9 @@ def on_authenticate_success(self, result):
self.session)
self.gui.show_main_window(user)
self.update_sources()
self.api_job_queue.login(self.api)
self.api_job_queue.start(self.api)
self.api_sync.start(self.api)
self.is_authenticated = True
self.resume_queues()

def on_authenticate_failure(self, result: Exception) -> None:
# Failed to authenticate. Reset state with failure message.
Expand Down Expand Up @@ -431,8 +434,6 @@ def on_sync_success(self) -> None:
* Download new messages and replies
* Update missing files so that they can be re-downloaded
"""
self.gui.clear_error_status() # remove any permanent error status message

with open(self.sync_flag, 'w') as f:
f.write(arrow.now().format())

Expand Down Expand Up @@ -473,18 +474,10 @@ def update_sources(self):
self.update_sync()

def on_update_star_success(self, result) -> None:
"""
After we star a source, we should sync the API such that the local database is updated.
"""
self.gui.clear_error_status() # remove any permanent error status message
pass

def on_update_star_failure(self, result: UpdateStarJobException) -> None:
"""
After we unstar a source, we should sync the API such that the local database is updated.
"""
logging.info("failed to push change to server")
error = _('Failed to update star.')
self.gui.update_error_status(error)
self.gui.update_error_status(_('Failed to update star.'))

@login_required
def update_star(self, source_db_object, callback):
Expand All @@ -505,6 +498,11 @@ def logout(self):
Then mark all pending draft replies as failed, stop the queues, and show the user as logged
out in the GUI.
"""

# clear error status in case queue was paused resulting in a permanent error message with
# retry link
self.gui.clear_error_status()

if self.api is not None:
self.call_api(self.api.logout, self.on_logout_success, self.on_logout_failure)
self.invalidate_token()
Expand All @@ -514,7 +512,7 @@ def logout(self):
self.reply_failed.emit(failed_reply.uuid)

self.api_sync.stop()
self.api_job_queue.logout()
self.api_job_queue.stop()
self.gui.logout()

self.is_authenticated = False
Expand Down Expand Up @@ -564,7 +562,6 @@ def on_message_download_success(self, uuid: str) -> None:
"""
Called when a message has downloaded.
"""
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit() # Needed to flush stale data.
message = storage.get_message(self.session, uuid)
self.message_ready.emit(message.source.uuid, message.uuid, message.content)
Expand All @@ -589,7 +586,6 @@ def on_reply_download_success(self, uuid: str) -> None:
"""
Called when a reply has downloaded.
"""
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit() # Needed to flush stale data.
reply = storage.get_reply(self.session, uuid)
self.reply_ready.emit(reply.source.uuid, reply.uuid, reply.content)
Expand Down Expand Up @@ -700,7 +696,6 @@ def on_file_download_success(self, uuid: Any) -> None:
"""
Called when a file has downloaded.
"""
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit()
file_obj = storage.get_file(self.session, uuid)
self.file_ready.emit(file_obj.source.uuid, uuid, file_obj.filename)
Expand Down Expand Up @@ -728,7 +723,6 @@ def on_delete_source_success(self, result) -> None:
"""
# Delete the local version of the source.
storage.delete_local_source_by_uuid(self.session, result)
self.gui.clear_error_status() # remove any permanent error status message
# Update the sources UI.
self.update_sources()

Expand Down Expand Up @@ -789,7 +783,6 @@ def send_reply(self, source_uuid: str, reply_uuid: str, message: str) -> None:

def on_reply_success(self, reply_uuid: str) -> None:
logger.debug('{} sent successfully'.format(reply_uuid))
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit()
reply = storage.get_reply(self.session, reply_uuid)
self.reply_succeeded.emit(reply.source.uuid, reply_uuid, reply.content)
Expand All @@ -808,7 +801,6 @@ def get_file(self, file_uuid: str) -> db.File:

def on_logout_success(self, result) -> None:
logging.info('Client logout successful')
self.gui.clear_error_status() # remove any permanent error status message

def on_logout_failure(self, result: Exception) -> None:
logging.info('Client logout failure')
136 changes: 75 additions & 61 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from PyQt5.QtCore import QObject, QThread, pyqtSlot, pyqtSignal
from queue import PriorityQueue, Full
from queue import PriorityQueue
from sdclientapi import API, RequestTimeoutError
from sqlalchemy.orm import scoped_session
from typing import Optional, Tuple # noqa: F401
Expand All @@ -26,11 +26,15 @@ class RunnableQueue(QObject):
job type. If multiple jobs of the same type are added to the queue then they are retrieved
in FIFO order.

If a RequestTimeoutError or ApiInaccessibleError is encountered while processing a job, the
job will be added back to the queue and a pause signal will be emitted. However the processing
loop will not stop until a PauseQueueJob is retrieved. Once this happens, all processing stops.
New jobs can still be added, but the processing function will need to be called again in order
to resume. The processing loop is resumed when the resume signal is emitted.
If a RequestTimeoutError is encountered while processing a job, the job will be added back to
the queue, the processing loop will stop, and the paused signal will be emitted. New jobs can
still be added, but the processing function will need to be called again in order to resume. The
processing loop is resumed when the resume signal is emitted.

If an ApiInaccessibleError is encountered while processing a job, api_client will be set to
None and the processing loop will stop. If the queue is resumed before the queue manager
stops the queue thread, api_client will still be None and the next job will raise an
ApiInaccessibleError before it makes an api call, which will repeat this process.

Any other exception encountered while processing a job is unexpected, so the queue will drop the
job and continue on to processing the next job. The job itself is responsible for emiting the
Expand All @@ -40,7 +44,6 @@ class RunnableQueue(QObject):

# These are the priorities for processing jobs. Lower numbers corresponds to a higher priority.
JOB_PRIORITIES = {
# TokenInvalidationJob: 10, # Not yet implemented
PauseQueueJob: 11,
FileDownloadJob: 13, # File downloads processed in separate queue
DeleteSourceJob: 14,
Expand All @@ -50,30 +53,22 @@ class RunnableQueue(QObject):
ReplyDownloadJob: 17,
}

'''
Signal that is emitted when processing stops
'''
# Signal that is emitted when processing stops
paused = pyqtSignal()

'''
Signal that is emitted to resume processing jobs
'''
# Signal that is emitted to resume processing jobs
resume = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None:
"""
A size of zero means there's no upper bound to the queue size.
"""
def __init__(self, api_client: API, session_maker: scoped_session) -> None:
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
self.queue = PriorityQueue(maxsize=size) # type: PriorityQueue[Tuple[int, ApiJob]]
self.queue = PriorityQueue() # type: PriorityQueue[Tuple[int, ApiJob]]
# `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed
# because PriorityQueue is implemented using heapq which does not have sort stability. For
# more info, see : https://bugs.python.org/issue17794
self.order_number = itertools.count()

# Rsume signals to resume processing
self.resume.connect(self.process)

def add_job(self, job: ApiJob) -> None:
Expand All @@ -83,10 +78,7 @@ def add_job(self, job: ApiJob) -> None:
current_order_number = next(self.order_number)
job.order_number = current_order_number
priority = self.JOB_PRIORITIES[type(job)]
try:
self.queue.put_nowait((priority, job))
except Full:
pass # Pass silently if the queue is full
self.queue.put_nowait((priority, job))

def re_add_job(self, job: ApiJob) -> None:
'''
Expand All @@ -95,10 +87,7 @@ def re_add_job(self, job: ApiJob) -> None:
'''
job.remaining_attempts = DEFAULT_NUM_ATTEMPTS
priority = self.JOB_PRIORITIES[type(job)]
try:
self.queue.put_nowait((priority, job))
except Full:
pass # Pass silently if the queue is full
self.queue.put_nowait((priority, job))

@pyqtSlot()
def process(self) -> None:
Expand All @@ -120,38 +109,43 @@ def process(self) -> None:

Note: Generic exceptions are handled in _do_call_api.
'''
logger.debug('Beginning queue processing loop')

while True:
priority, job = self.queue.get(block=True)

if isinstance(job, PauseQueueJob):
logger.debug('Paused queue')
self.paused.emit()
return

try:
session = self.session_maker()
job._do_call_api(self.api_client, session)
except ApiInaccessibleError as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
logger.debug('{}: {}'.format(type(e).__name__, e))
self.api_client = None
return
except RequestTimeoutError as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
logger.debug('{}: {}'.format(type(e).__name__, e))
self.add_job(PauseQueueJob())
self.re_add_job(job)
except Exception as e:
logger.error('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
logger.error('Skipping job')
logger.error('{}: {}'.format(type(e).__name__, e))
logger.debug('Skipping job')
finally:
session.close()


class ApiJobQueue(QObject):
'''
Signal that is emitted after a queue is paused
ApiJobQueue is the queue manager of two FIFO priority queues that process jobs of type ApiJob.


The queue manager starts the queues when a new auth token is provided to ensure jobs are able to
make their requests. It stops the queues whenever a MetadataSyncJob, which runs in a continuous
loop outside of the queue manager, encounters an ApiInaccessibleError and forces a logout
from the Controller.
'''

# Signal that is emitted after a queue is paused.
paused = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session) -> None:
Expand All @@ -169,54 +163,74 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:
self.main_thread.started.connect(self.main_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)

self.main_queue.paused.connect(self.on_queue_paused)
self.download_file_queue.paused.connect(self.on_queue_paused)
self.main_queue.paused.connect(self.on_main_queue_paused)
self.download_file_queue.paused.connect(self.on_file_download_queue_paused)

def logout(self) -> None:
if self.main_thread.isRunning():
logger.debug('Stopping main queue thread')
self.main_thread.quit()

if self.download_file_thread.isRunning():
logger.debug('Stopping download queue thread')
self.download_file_thread.quit()

def login(self, api_client: API) -> None:
logger.debug('Passing API token to queues')
def start(self, api_client: API) -> None:
'''
Start the queues whenever a new api token is provided.
'''
self.main_queue.api_client = api_client
self.download_file_queue.api_client = api_client
self.start_queues()

def start_queues(self) -> None:
if not self.main_thread.isRunning():
logger.debug('Starting main thread')
self.main_thread.start()
logger.debug('Started main queue')

if not self.download_file_thread.isRunning():
logger.debug('Starting download thread')
self.download_file_thread.start()
logger.debug('Started file download queue')

def on_queue_paused(self) -> None:
def stop(self) -> None:
'''
Stop the queues.
'''
if self.main_thread.isRunning():
self.main_thread.quit()
logger.debug('Stopped main queue')

if self.download_file_thread.isRunning():
self.download_file_thread.quit()
logger.debug('Stopped file download queue')

@pyqtSlot()
def on_main_queue_paused(self) -> None:
'''
Emit the paused signal if the main queue has been paused.
'''
logger.debug('Paused main queue')
self.paused.emit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little hard to follow because both ApiJobQueue and RunnableQueue have signals called paused on them which do slightly different things: the first (signal defined on ApiJobQueue) signals the Controller object "hey i paused the queues, update the GUI", the other (signal defined on RunnableQueue) is from a queue to signal ApiJobQueue "hey i hit a consistent request timeout, let's stop the queues and let the GUI know".

However it looks like these queues are pausing independently: one can be stopped while the other is running. Meanwhile the GUI shows that the server is unreachable. Is this intentional? Or is the idea that if one queue is stopped due to network failure the other will eventually stop too?

lmk if you think I'm missing something here

Copy link
Contributor Author

@sssoleileraaa sssoleileraaa Feb 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the idea that if one queue is stopped due to network failure the other will eventually stop too?

yes, this was the idea

the way it has been implemented is that there is a paused signal in RunnableQueue, which maintains a priority queue, and will pause when it encounters a request timeout error. meanwhile, other queues do not know about this, and cannot know unless ApiJobQueue, which really should be renamed to ApiJobQueueManager, tells them that another queue was paused. To keep things simple, the queue manager just sends a signal back to the GUI so the user can retry. It could also tell the other queue to pause by enqueuing a PauseQueueJob. I think this is risky because the other queue might at the same time be adding a PauseQueueJob because it too has encountered a timeout error.

i think changing the behavior of how this works should be discussed further and if we decide to change it i would advocate for doing so outside of this pr in order to leave it as a refactor-only pr

Copy link
Contributor Author

@sssoleileraaa sssoleileraaa Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing we could do as a small additional refactor in this PR is replace using a signal from RunnableQueue with using a callback that ApiJobQueue passes to it, so this:

--- a/securedrop_client/queue.py
+++ b/securedrop_client/queue.py
@@ -53,16 +53,15 @@ class RunnableQueue(QObject):
         ReplyDownloadJob: 17,
     }
 
-    # Signal that is emitted when processing stops
-    paused = pyqtSignal()
-
     # Signal that is emitted to resume processing jobs
     resume = pyqtSignal()
 
-    def __init__(self, api_client: API, session_maker: scoped_session) -> None:
+    def __init__(self, api_client: API, session_maker: scoped_session, on_pause_callback) -> None:
         super().__init__()
         self.api_client = api_client
         self.session_maker = session_maker
+        self.on_pause_callback = on_pause_callback
+
         self.queue = PriorityQueue()  # type: PriorityQueue[Tuple[int, ApiJob]]
         # `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed
         # because PriorityQueue is implemented using heapq which does not have sort stability. For
@@ -113,7 +112,7 @@ class RunnableQueue(QObject):
             priority, job = self.queue.get(block=True)
 
             if isinstance(job, PauseQueueJob):
-                self.paused.emit()
+                self.on_pause_callback()
                 return
 
             try:
@@ -154,8 +153,9 @@ class ApiJobQueue(QObject):
         self.main_thread = QThread()
         self.download_file_thread = QThread()
 
-        self.main_queue = RunnableQueue(api_client, session_maker)
-        self.download_file_queue = RunnableQueue(api_client, session_maker)
+        self.main_queue = RunnableQueue(api_client, session_maker, self.on_main_queue_paused)
+        self.download_file_queue = RunnableQueue(
+            api_client, session_maker, self.on_file_download_queue_paused)
 
         self.main_queue.moveToThread(self.main_thread)
         self.download_file_queue.moveToThread(self.download_file_thread)


@pyqtSlot()
def on_file_download_queue_paused(self) -> None:
'''
Emit the paused signal if the file download queue has been paused.
'''
logger.debug('Paused file download queue')
self.paused.emit()

def resume_queues(self) -> None:
'''
Emit the resume signal to the queues if they are running.
'''
if self.main_thread.isRunning():
logger.debug("Resuming main queue")
self.main_queue.resume.emit()
if self.download_file_thread.isRunning():
logger.debug("Resuming download queue")
self.download_file_queue.resume.emit()

def enqueue(self, job: ApiJob) -> None:
# Prevent api jobs being added to the queue when not logged in.
if (not self.main_queue.api_client or not self.download_file_queue.api_client):
logger.info('Not adding job, we are not logged in')
'''
Enqueue the supplied job if the queues are running.
'''
if not self.main_thread.isRunning() or not self.download_file_thread.isRunning():
logger.debug('Not adding job before queues have been started.')
return

# First check the queues are started in case they died for some reason.
self.start_queues()

if isinstance(job, FileDownloadJob):
logger.debug('Adding job to download queue')
self.download_file_queue.add_job(job)
logger.debug('Added {} to download queue'.format(job.__class__.__name__))
else:
logger.debug('Adding job to main queue')
self.main_queue.add_job(job)
logger.debug('Added {} to main queue'.format(job.__class__.__name__))
Loading