From 1befa381315ec5f32e3eea6be5bf41793de29ea3 Mon Sep 17 00:00:00 2001 From: aeris Date: Thu, 19 Oct 2017 18:22:28 +0200 Subject: [PATCH] Allow setting queue for each job --- lib/gush/client.rb | 4 ++-- lib/gush/job.rb | 13 ++++++++++++- lib/gush/worker.rb | 36 +++++++++++++++++++++++++++++++++--- lib/gush/workflow.rb | 3 ++- 4 files changed, 49 insertions(+), 7 deletions(-) diff --git a/lib/gush/client.rb b/lib/gush/client.rb index fb6fac9..4ec86a3 100644 --- a/lib/gush/client.rb +++ b/lib/gush/client.rb @@ -18,7 +18,6 @@ def create_workflow(name) rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end - flow end def start_workflow(workflow, job_names = []) @@ -162,8 +161,9 @@ def expire_job(workflow_id, job, ttl=nil) def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) + queue = job.queue || configuration.namespace - Gush::Worker.set(queue: configuration.namespace).perform_later(*[workflow_id, job.name]) + Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name]) end private diff --git a/lib/gush/job.rb b/lib/gush/job.rb index 62b7931..26b5007 100644 --- a/lib/gush/job.rb +++ b/lib/gush/job.rb @@ -1,7 +1,7 @@ module Gush class Job attr_accessor :workflow_id, :incoming, :outgoing, :params, - :finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass + :finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass, :queue attr_reader :name, :output_payload, :params def initialize(opts = {}) @@ -9,10 +9,17 @@ def initialize(opts = {}) assign_variables(options) end + def payload(clazz) + payload = payloads.detect { |f| f[:class] == clazz.name } + raise "Unable to find payload for #{clazz}, available: #{payloads.collect { |f| f[:class]}}" unless payload + payload[:output] + end + def as_json { name: name, klass: self.class.to_s, + queue: queue, incoming: incoming, outgoing: outgoing, finished_at: finished_at, @@ -98,6 +105,9 @@ def has_no_dependencies? end private + def logger + Rails.logger + end def client @client ||= Client.new @@ -119,6 +129,7 @@ def assign_variables(opts) @klass = opts[:klass] @output_payload = opts[:output_payload] @workflow_id = opts[:workflow_id] + @queue = opts[:queue] end end end diff --git a/lib/gush/worker.rb b/lib/gush/worker.rb index d17c3f7..8bedeab 100644 --- a/lib/gush/worker.rb +++ b/lib/gush/worker.rb @@ -65,16 +65,46 @@ def elapsed(start) (Time.now - start).to_f.round(3) end - def enqueue_outgoing_jobs - job.outgoing.each do |job_name| + + def enqueue_outgoing_job(workflow_id, job_name) + retry_count = 0 + begin RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do out = client.find_job(workflow_id, job_name) - if out.ready_to_start? + puts "enqueing the job for execution #{out.to_s}" client.enqueue_job(workflow_id, out) end + out + end + rescue RedisMutex::LockError + retry_count += 1 + + # Retry after some time + if retry_count <= 5 + sleep 5 + retry + else + false end end end + + def enqueue_outgoing_jobs + completed_jobs = [] + + job.outgoing.each do |job_name| + # First check if job is already running or not + # status, out = enqueue_outgoing_job(workflow_id, job_name) + out = client.find_job(workflow_id, job_name) + if out.ready_to_start? + out = enqueue_outgoing_job(workflow_id, job_name) + completed_jobs << out if out != false + else + completed_jobs << out + end + end + end + end end diff --git a/lib/gush/workflow.rb b/lib/gush/workflow.rb index e60f56c..2134ae0 100644 --- a/lib/gush/workflow.rb +++ b/lib/gush/workflow.rb @@ -109,7 +109,8 @@ def run(klass, opts = {}) node = klass.new({ workflow_id: id, name: client.next_free_job_id(id, klass.to_s), - params: opts.fetch(:params, {}) + params: opts.fetch(:params, {}), + queue: opts[:queue] }) jobs << node