Skip to content

Commit

Permalink
Allow setting queue for each job
Browse files Browse the repository at this point in the history
  • Loading branch information
aeris authored and devilankur18 committed Aug 12, 2018
1 parent 49000a3 commit 1befa38
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 7 deletions.
4 changes: 2 additions & 2 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def create_workflow(name)
rescue NameError
raise WorkflowNotFound.new("Workflow with given name doesn't exist")
end
flow
end

def start_workflow(workflow, job_names = [])
Expand Down Expand Up @@ -162,8 +161,9 @@ def expire_job(workflow_id, job, ttl=nil)
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
queue = job.queue || configuration.namespace

Gush::Worker.set(queue: configuration.namespace).perform_later(*[workflow_id, job.name])
Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end

private
Expand Down
13 changes: 12 additions & 1 deletion lib/gush/job.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
module Gush
class Job
attr_accessor :workflow_id, :incoming, :outgoing, :params,
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass, :queue
attr_reader :name, :output_payload, :params

def initialize(opts = {})
options = opts.dup
assign_variables(options)
end

def payload(clazz)
payload = payloads.detect { |f| f[:class] == clazz.name }
raise "Unable to find payload for #{clazz}, available: #{payloads.collect { |f| f[:class]}}" unless payload
payload[:output]
end

def as_json
{
name: name,
klass: self.class.to_s,
queue: queue,
incoming: incoming,
outgoing: outgoing,
finished_at: finished_at,
Expand Down Expand Up @@ -98,6 +105,9 @@ def has_no_dependencies?
end

private
def logger
Rails.logger
end

def client
@client ||= Client.new
Expand All @@ -119,6 +129,7 @@ def assign_variables(opts)
@klass = opts[:klass]
@output_payload = opts[:output_payload]
@workflow_id = opts[:workflow_id]
@queue = opts[:queue]
end
end
end
36 changes: 33 additions & 3 deletions lib/gush/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,46 @@ def elapsed(start)
(Time.now - start).to_f.round(3)
end

def enqueue_outgoing_jobs
job.outgoing.each do |job_name|

def enqueue_outgoing_job(workflow_id, job_name)
retry_count = 0
begin
RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do
out = client.find_job(workflow_id, job_name)

if out.ready_to_start?
puts "enqueing the job for execution #{out.to_s}"
client.enqueue_job(workflow_id, out)
end
out
end
rescue RedisMutex::LockError
retry_count += 1

# Retry after some time
if retry_count <= 5
sleep 5
retry
else
false
end
end
end

def enqueue_outgoing_jobs
completed_jobs = []

job.outgoing.each do |job_name|
# First check if job is already running or not
# status, out = enqueue_outgoing_job(workflow_id, job_name)
out = client.find_job(workflow_id, job_name)
if out.ready_to_start?
out = enqueue_outgoing_job(workflow_id, job_name)
completed_jobs << out if out != false
else
completed_jobs << out
end
end
end

end
end
3 changes: 2 additions & 1 deletion lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def run(klass, opts = {})
node = klass.new({
workflow_id: id,
name: client.next_free_job_id(id, klass.to_s),
params: opts.fetch(:params, {})
params: opts.fetch(:params, {}),
queue: opts[:queue]
})

jobs << node
Expand Down

0 comments on commit 1befa38

Please sign in to comment.