Skip to content

Commit

Permalink
Use ReduxMutex to make sure parallel ancenstors don't enqueue the sam…
Browse files Browse the repository at this point in the history
…e job multiple times
  • Loading branch information
pokonski committed Jul 10, 2018
1 parent 5ff9202 commit 49000a3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
3 changes: 2 additions & 1 deletion gush.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "gush"
spec.version = "1.1.1"
spec.version = "1.2.0"
spec.authors = ["Piotrek Okoński"]
spec.email = ["[email protected]"]
spec.summary = "Fast and distributed workflow runner based on ActiveJob and Redis"
Expand All @@ -21,6 +21,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "connection_pool", "~> 2.2.1"
spec.add_dependency "multi_json", "~> 1.11"
spec.add_dependency "redis", ">= 3.2", "< 5"
spec.add_dependency "redis-mutex", "~> 4.0.1"
spec.add_dependency "hiredis", "~> 0.6"
spec.add_dependency "ruby-graphviz", "~> 1.2"
spec.add_dependency "terminal-table", "~> 1.4"
Expand Down
4 changes: 3 additions & 1 deletion lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ def workflow_from_hash(hash, nodes = [])
end

def build_redis
Redis.new(url: configuration.redis_url)
Redis.new(url: configuration.redis_url).tap do |instance|
RedisClassy.redis = instance
end
end

def connection_pool
Expand Down
10 changes: 7 additions & 3 deletions lib/gush/worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'active_job'
require 'redis-mutex'

module Gush
class Worker < ::ActiveJob::Base
Expand Down Expand Up @@ -66,9 +67,12 @@ def elapsed(start)

def enqueue_outgoing_jobs
job.outgoing.each do |job_name|
out = client.find_job(workflow_id, job_name)
if out.ready_to_start?
client.enqueue_job(workflow_id, out)
RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do
out = client.find_job(workflow_id, job_name)

if out.ready_to_start?
client.enqueue_job(workflow_id, out)
end
end
end
end
Expand Down
41 changes: 41 additions & 0 deletions spec/features/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,45 @@ def configure
expect(flow).to be_finished
expect(flow).to_not be_failed
end

it 'executes job with multiple ancestors only once' do
NO_DUPS_INTERNAL_SPY = double('spy')
expect(NO_DUPS_INTERNAL_SPY).to receive(:some_method).exactly(1).times

class FirstAncestor < Gush::Job
def perform
end
end

class SecondAncestor < Gush::Job
def perform
end
end

class FinalJob < Gush::Job
def perform
NO_DUPS_INTERNAL_SPY.some_method
end
end

class NoDuplicatesWorkflow < Gush::Workflow
def configure
run FirstAncestor
run SecondAncestor

run FinalJob, after: [FirstAncestor, SecondAncestor]
end
end

flow = NoDuplicatesWorkflow.create
flow.start!

5.times do
perform_one
end

flow = flow.reload
expect(flow).to be_finished
expect(flow).to_not be_failed
end
end

0 comments on commit 49000a3

Please sign in to comment.