Skip to content

Commit

Permalink
refactor stop/pause/resume queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Allie Crevier committed Feb 10, 2020
1 parent c883cd2 commit 1acb649
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 90 deletions.
34 changes: 13 additions & 21 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ def on_queue_paused(self) -> None:
def resume_queues(self) -> None:
self.api_job_queue.resume_queues()

# clear error status in case queue was paused resulting in a permanent error message with
# retry link
self.gui.clear_error_status()

def completed_api_call(self, thread_id, user_callback):
"""
Manage a completed API call. The actual result *may* be an exception or
Expand Down Expand Up @@ -371,10 +375,9 @@ def on_authenticate_success(self, result):
self.session)
self.gui.show_main_window(user)
self.update_sources()
self.api_job_queue.login(self.api)
self.api_job_queue.start(self.api)
self.api_sync.start(self.api)
self.is_authenticated = True
self.resume_queues()

def on_authenticate_failure(self, result: Exception) -> None:
# Failed to authenticate. Reset state with failure message.
Expand Down Expand Up @@ -429,8 +432,6 @@ def on_sync_success(self) -> None:
* Download new messages and replies
* Update missing files so that they can be re-downloaded
"""
self.gui.clear_error_status() # remove any permanent error status message

with open(self.sync_flag, 'w') as f:
f.write(arrow.now().format())

Expand Down Expand Up @@ -471,18 +472,10 @@ def update_sources(self):
self.update_sync()

def on_update_star_success(self, result) -> None:
"""
After we star a source, we should sync the API such that the local database is updated.
"""
self.gui.clear_error_status() # remove any permanent error status message
pass

def on_update_star_failure(self, result: UpdateStarJobException) -> None:
"""
After we unstar a source, we should sync the API such that the local database is updated.
"""
logging.info("failed to push change to server")
error = _('Failed to update star.')
self.gui.update_error_status(error)
self.gui.update_error_status(_('Failed to update star.'))

@login_required
def update_star(self, source_db_object, callback):
Expand All @@ -503,6 +496,11 @@ def logout(self):
Then mark all pending draft replies as failed, stop the queues, and show the user as logged
out in the GUI.
"""

# clear error status in case queue was paused resulting in a permanent error message with
# retry link
self.gui.clear_error_status()

if self.api is not None:
self.call_api(self.api.logout, self.on_logout_success, self.on_logout_failure)
self.invalidate_token()
Expand All @@ -512,7 +510,7 @@ def logout(self):
self.reply_failed.emit(failed_reply.uuid)

self.api_sync.stop()
self.api_job_queue.logout()
self.api_job_queue.stop()
self.gui.logout()

self.is_authenticated = False
Expand Down Expand Up @@ -562,7 +560,6 @@ def on_message_download_success(self, uuid: str) -> None:
"""
Called when a message has downloaded.
"""
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit() # Needed to flush stale data.
message = storage.get_message(self.session, uuid)
self.message_ready.emit(message.source.uuid, message.uuid, message.content)
Expand All @@ -587,7 +584,6 @@ def on_reply_download_success(self, uuid: str) -> None:
"""
Called when a reply has downloaded.
"""
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit() # Needed to flush stale data.
reply = storage.get_reply(self.session, uuid)
self.reply_ready.emit(reply.source.uuid, reply.uuid, reply.content)
Expand Down Expand Up @@ -697,7 +693,6 @@ def on_file_download_success(self, uuid: Any) -> None:
"""
Called when a file has downloaded.
"""
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit()
file_obj = storage.get_file(self.session, uuid)
self.file_ready.emit(file_obj.source.uuid, uuid, file_obj.filename)
Expand All @@ -721,7 +716,6 @@ def on_delete_source_success(self, result) -> None:
"""
# Delete the local version of the source.
storage.delete_local_source_by_uuid(self.session, result)
self.gui.clear_error_status() # remove any permanent error status message
# Update the sources UI.
self.update_sources()

Expand Down Expand Up @@ -782,7 +776,6 @@ def send_reply(self, source_uuid: str, reply_uuid: str, message: str) -> None:

def on_reply_success(self, reply_uuid: str) -> None:
logger.debug('{} sent successfully'.format(reply_uuid))
self.gui.clear_error_status() # remove any permanent error status message
self.session.commit()
reply = storage.get_reply(self.session, reply_uuid)
self.reply_succeeded.emit(reply.source.uuid, reply_uuid, reply.content)
Expand All @@ -801,7 +794,6 @@ def get_file(self, file_uuid: str) -> db.File:

def on_logout_success(self, result) -> None:
logging.info('Client logout successful')
self.gui.clear_error_status() # remove any permanent error status message

def on_logout_failure(self, result: Exception) -> None:
logging.info('Client logout failure')
106 changes: 65 additions & 41 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ class RunnableQueue(QObject):
job type. If multiple jobs of the same type are added to the queue then they are retrieved
in FIFO order.
If a RequestTimeoutError or ApiInaccessibleError is encountered while processing a job, the
job will be added back to the queue and a pause signal will be emitted. However the processing
loop will not stop until a PauseQueueJob is retrieved. Once this happens, all processing stops.
New jobs can still be added, but the processing function will need to be called again in order
to resume. The processing loop is resumed when the resume signal is emitted.
If a RequestTimeoutError is encountered while processing a job, the job will be added back to
the queue, the processing loop will stop, and the paused signal will be emitted. New jobs can
still be added, but the processing function will need to be called again in order to resume. The
processing loop is resumed when the resume signal is emitted.
If an ApiInaccessibleError is encountered while processing a job, api_client will be set to
None and the processing loop will stop. If the queue is resumed before the queue manager
stops the queue thread, api_client will still be None and the next job will raise an
ApiInaccessibleError before it makes an api call, which will repeat this process.
Any other exception encountered while processing a job is unexpected, so the queue will drop the
job and continue on to processing the next job. The job itself is responsible for emiting the
Expand All @@ -40,7 +44,6 @@ class RunnableQueue(QObject):

# These are the priorities for processing jobs. Lower numbers corresponds to a higher priority.
JOB_PRIORITIES = {
# TokenInvalidationJob: 10, # Not yet implemented
PauseQueueJob: 11,
FileDownloadJob: 13, # File downloads processed in separate queue
DeleteSourceJob: 14,
Expand All @@ -50,30 +53,23 @@ class RunnableQueue(QObject):
ReplyDownloadJob: 17,
}

'''
Signal that is emitted when processing stops
'''
# Signal that is emitted when processing stops
paused = pyqtSignal()

'''
Signal that is emitted to resume processing jobs
'''
# Signal that is emitted to resume processing jobs
resume = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None:
"""
A size of zero means there's no upper bound to the queue size.
"""
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
# If the maxsize provided to PriorityQueue is 0, there will be no upper bound
self.queue = PriorityQueue(maxsize=size) # type: PriorityQueue[Tuple[int, ApiJob]]
# `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed
# because PriorityQueue is implemented using heapq which does not have sort stability. For
# more info, see : https://bugs.python.org/issue17794
self.order_number = itertools.count()

# Rsume signals to resume processing
self.resume.connect(self.process)

def add_job(self, job: ApiJob) -> None:
Expand Down Expand Up @@ -150,8 +146,16 @@ def process(self) -> None:

class ApiJobQueue(QObject):
'''
Signal that is emitted after a queue is paused
ApiJobQueue is the queue manager of two FIFO priority queues that process jobs of type ApiJob.
The queue manager starts the queues when a new auth token is provided to ensure jobs are able to
make their requests. It stops the queues whenever a MetadataSyncJob, which runs in a continuous
loop outside of the queue manager, encounters an ApiInaccessibleError and forces a logout
from the Controller.
'''

# Signal that is emitted after a queue is paused.
paused = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session) -> None:
Expand All @@ -169,51 +173,71 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:
self.main_thread.started.connect(self.main_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)

self.main_queue.paused.connect(self.on_queue_paused)
self.download_file_queue.paused.connect(self.on_queue_paused)
self.main_queue.paused.connect(self.on_main_queue_paused)
self.download_file_queue.paused.connect(self.on_file_download_queue_paused)

def logout(self) -> None:
if self.main_thread.isRunning():
logger.debug('Stopping main queue thread')
self.main_thread.quit()

if self.download_file_thread.isRunning():
logger.debug('Stopping download queue thread')
self.download_file_thread.quit()

def login(self, api_client: API) -> None:
logger.debug('Passing API token to queues')
def start(self, api_client: API) -> None:
'''
Start the queues whenever a new api token is provided.
'''
self.main_queue.api_client = api_client
self.download_file_queue.api_client = api_client
self.start_queues()

def start_queues(self) -> None:
if not self.main_thread.isRunning():
logger.debug('Starting main thread')
self.main_thread.start()
logger.debug('Started main queue')

if not self.download_file_thread.isRunning():
logger.debug('Starting download thread')
self.download_file_thread.start()
logger.debug('Started file download queue')

def on_queue_paused(self) -> None:
def stop(self) -> None:
'''
Stop the queues.
'''
if self.main_thread.isRunning():
self.main_thread.quit()
logger.debug('Stopped main queue')

if self.download_file_thread.isRunning():
self.download_file_thread.quit()
logger.debug('Stopped file download queue')

@pyqtSlot()
def on_main_queue_paused(self) -> None:
'''
Emit the paused signal if the main queue has been paused.
'''
logger.debug('Paused main queue')
self.paused.emit()

@pyqtSlot()
def on_file_download_queue_paused(self) -> None:
'''
Emit the paused signal if the file download queue has been paused.
'''
logger.debug('Paused file download queue')
self.paused.emit()

def resume_queues(self) -> None:
'''
Emit the resume signal to the queues if they are running.
'''
if self.main_thread.isRunning():
logger.debug("Resuming main queue")
self.main_queue.resume.emit()
if self.download_file_thread.isRunning():
logger.debug("Resuming download queue")
self.download_file_queue.resume.emit()

def enqueue(self, job: ApiJob) -> None:
# Prevent api jobs being added to the queue when not logged in.
if (not self.main_queue.api_client or not self.download_file_queue.api_client):
logger.info('Not adding job, we are not logged in')
'''
Enqueue the supplied job if the queues are running.
'''
if not self.main_thread.isRunning() or not self.download_file_thread.isRunning():
logger.debug('Not adding job before queues have been started.')
return

# First check the queues are started in case they died for some reason.
self.start_queues()

if isinstance(job, FileDownloadJob):
logger.debug('Adding job to download queue')
self.download_file_queue.add_job(job)
Expand Down
Loading

0 comments on commit 1acb649

Please sign in to comment.