Skip to content

Commit

Permalink
Try to enqueue outgoing jobs in another worker (#71)
Browse files Browse the repository at this point in the history
* Try to enqueue outgoing jobs in another worker

# Summary

This commit is trying to fix issue RedisMutex::LockError on
#57.
Whenever we have the error, it simply enqueues another worker.
At the begin of the worker, if the job is succedeed, we simply call
`enqueue_outgoing_jobs` again.

* Delay next job's execution to avoid hammering the lock
  • Loading branch information
suonlight authored and pokonski committed Oct 10, 2019
1 parent 08696a2 commit 682c7fe
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 0 deletions.
8 changes: 8 additions & 0 deletions lib/gush/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ class Worker < ::ActiveJob::Base
def perform(workflow_id, job_id)
setup_job(workflow_id, job_id)

if job.succeeded?
# Try to enqueue outgoing jobs again because the last job has redis mutex lock error
enqueue_outgoing_jobs
return
end

job.payloads = incoming_payloads

error = nil
Expand Down Expand Up @@ -75,6 +81,8 @@ def enqueue_outgoing_jobs
end
end
end
rescue RedisMutex::LockError
Worker.set(wait: 2.seconds).perform_later(workflow_id, job.name)
end
end
end
12 changes: 12 additions & 0 deletions spec/gush/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ def configure
end
end

context 'when job failed to enqueue outgoing jobs' do
it 'enqeues another job to handling enqueue_outgoing_jobs' do
allow(RedisMutex).to receive(:with_lock).and_raise(RedisMutex::LockError)
subject.perform(workflow.id, 'Prepare')
expect(Gush::Worker).to have_no_jobs(workflow.id, jobs_with_id(["FetchFirstJob", "FetchSecondJob"]))

allow(RedisMutex).to receive(:with_lock).and_call_original
perform_one
expect(Gush::Worker).to have_jobs(workflow.id, jobs_with_id(["FetchFirstJob", "FetchSecondJob"]))
end
end

it "calls job.perform method" do
SPY = double()
expect(SPY).to receive(:some_method)
Expand Down
13 changes: 13 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ def job_with_id(job_name)
end
end

RSpec::Matchers.define :have_no_jobs do |flow, jobs|
match do |actual|
expected = jobs.map do |job|
hash_including(args: include(flow, job))
end
expect(ActiveJob::Base.queue_adapter.enqueued_jobs).not_to match_array(expected)
end

failure_message do |actual|
"expected queue to have no #{jobs}, but instead has: #{ActiveJob::Base.queue_adapter.enqueued_jobs.map{ |j| j[:args][1]}}"
end
end

RSpec.configure do |config|
config.include ActiveJob::TestHelper
config.include GushHelpers
Expand Down

0 comments on commit 682c7fe

Please sign in to comment.