From 2dc03a1e9a67eb10fb1ca061451d6d2b996d187e Mon Sep 17 00:00:00 2001 From: Kimmie Date: Sun, 13 Oct 2019 16:14:00 +0700 Subject: [PATCH 1/4] =?UTF-8?q?Allow=20RedisMutex=E2=80=99s=20Locking=20du?= =?UTF-8?q?ration=20and=20polling=20interval=20to=20be=20customizable=20?= =?UTF-8?q?=09-=20Update=20the=20configuration=20file=20with=202=20new=20c?= =?UTF-8?q?ustomizable=20fields:=20locking=5Fduration=20and=20polling=5Fin?= =?UTF-8?q?ternal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/gush/configuration.rb | 24 ++++++++++++++---------- spec/gush/configuration_spec.rb | 6 ++++++ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/lib/gush/configuration.rb b/lib/gush/configuration.rb index 55fd50e..40118f3 100644 --- a/lib/gush/configuration.rb +++ b/lib/gush/configuration.rb @@ -1,17 +1,19 @@ module Gush class Configuration - attr_accessor :concurrency, :namespace, :redis_url, :ttl + attr_accessor :concurrency, :namespace, :redis_url, :ttl, :locking_duration, :polling_interval def self.from_json(json) new(Gush::JSON.decode(json, symbolize_keys: true)) end def initialize(hash = {}) - self.concurrency = hash.fetch(:concurrency, 5) - self.namespace = hash.fetch(:namespace, 'gush') - self.redis_url = hash.fetch(:redis_url, 'redis://localhost:6379') - self.gushfile = hash.fetch(:gushfile, 'Gushfile') - self.ttl = hash.fetch(:ttl, -1) + self.concurrency = hash.fetch(:concurrency, 5) + self.namespace = hash.fetch(:namespace, 'gush') + self.redis_url = hash.fetch(:redis_url, 'redis://localhost:6379') + self.gushfile = hash.fetch(:gushfile, 'Gushfile') + self.ttl = hash.fetch(:ttl, -1) + self.locking_duration = hash.fetch(:locking_duration, 2) # how long you want to wait for the lock to be released, in seconds + self.polling_interval = hash.fetch(:polling_internal, 0.3) # how long the polling interval should be, in seconds end def gushfile=(path) @@ -24,10 +26,12 @@ def gushfile def to_hash { - concurrency: concurrency, - namespace: namespace, - redis_url: redis_url, - ttl: ttl + concurrency: concurrency, + namespace: namespace, + redis_url: redis_url, + ttl: ttl, + locking_duration: locking_duration, + polling_interval: polling_interval } end diff --git a/spec/gush/configuration_spec.rb b/spec/gush/configuration_spec.rb index b2bb9a6..9398c26 100644 --- a/spec/gush/configuration_spec.rb +++ b/spec/gush/configuration_spec.rb @@ -8,6 +8,8 @@ expect(subject.concurrency).to eq(5) expect(subject.namespace).to eq('gush') expect(subject.gushfile).to eq(GUSHFILE.realpath) + expect(subject.locking_duration).to eq(2) + expect(subject.polling_interval).to eq(0.3) end describe "#configure" do @@ -15,10 +17,14 @@ Gush.configure do |config| config.redis_url = "redis://localhost" config.concurrency = 25 + config.locking_duration = 5 + config.polling_interval = 0.5 end expect(Gush.configuration.redis_url).to eq("redis://localhost") expect(Gush.configuration.concurrency).to eq(25) + expect(Gush.configuration.locking_duration).to eq(5) + expect(Gush.configuration.polling_interval).to eq(0.5) end end end From 08ff96761bcf3bc3ea9d0fcaf5daddbe0558f97d Mon Sep 17 00:00:00 2001 From: Kimmie Date: Sun, 13 Oct 2019 16:48:52 +0700 Subject: [PATCH 2/4] =?UTF-8?q?Allow=20RedisMutex=E2=80=99s=20Locking=20du?= =?UTF-8?q?ration=20and=20polling=20interval=20to=20be=20customizable=20?= =?UTF-8?q?=09-=20call=20RedisMutex.with=5Flock=20with=20the=20customizabl?= =?UTF-8?q?e=20polling=5Finterval=20and=20locking=5Fduration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/gush/worker.rb | 12 ++++++++++-- spec/gush/worker_spec.rb | 8 ++++++++ spec/spec_helper.rb | 6 ++++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/lib/gush/worker.rb b/lib/gush/worker.rb index 7c70eb0..6578cd1 100644 --- a/lib/gush/worker.rb +++ b/lib/gush/worker.rb @@ -30,12 +30,16 @@ def perform(workflow_id, job_id) private - attr_reader :client, :workflow_id, :job + attr_reader :client, :workflow_id, :job, :configuration def client @client ||= Gush::Client.new(Gush.configuration) end + def configuration + @configuration ||= client.configuration + end + def setup_job(workflow_id, job_id) @workflow_id = workflow_id @job ||= client.find_job(workflow_id, job_id) @@ -73,7 +77,11 @@ def elapsed(start) def enqueue_outgoing_jobs job.outgoing.each do |job_name| - RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do + RedisMutex.with_lock( + "gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", + sleep: configuration.polling_interval, + block: configuration.locking_duration + ) do out = client.find_job(workflow_id, job_name) if out.ready_to_start? diff --git a/spec/gush/worker_spec.rb b/spec/gush/worker_spec.rb index b023538..21c9c10 100644 --- a/spec/gush/worker_spec.rb +++ b/spec/gush/worker_spec.rb @@ -4,6 +4,8 @@ subject { described_class.new } let!(:workflow) { TestWorkflow.create } + let(:locking_duration) { 5 } + let(:polling_interval) { 0.5 } let!(:job) { client.find_job(workflow.id, "Prepare") } let(:config) { Gush.configuration.to_json } let!(:client) { Gush::Client.new } @@ -71,5 +73,11 @@ def configure subject.perform(workflow.id, 'OkayJob') end + + it 'calls RedisMutex.with_lock with customizable locking_duration and polling_interval' do + expect(RedisMutex).to receive(:with_lock) + .with(anything, block: 5, sleep: 0.5).twice + subject.perform(workflow.id, 'Prepare') + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 45b233e..c925ed0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -104,8 +104,10 @@ def job_with_id(job_name) clear_performed_jobs Gush.configure do |config| - config.redis_url = REDIS_URL - config.gushfile = GUSHFILE + config.redis_url = REDIS_URL + config.gushfile = GUSHFILE + config.locking_duration = defined?(locking_duration) ? locking_duration : 2 + config.polling_interval = defined?(polling_interval) ? polling_interval : 0.3 end end From 4748bbe90134e3955649dbe03642cd4555505742 Mon Sep 17 00:00:00 2001 From: Kimmie Date: Sun, 13 Oct 2019 17:08:02 +0700 Subject: [PATCH 3/4] Update README with the locking options --- README.md | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e242839..e8f394a 100644 --- a/README.md +++ b/README.md @@ -348,9 +348,23 @@ This requires that you have imagemagick installed on your computer: bundle exec gush viz ``` +### Customizing locking options + +In order to prevent getting the RedisMutex::LockError error when having a large number of jobs, you can customize these 2 fields `locking_duration` and `polling_interval` as below + +```ruby +# config/initializers/gush.rb +Gush.configure do |config| + config.redis_url = "redis://localhost:6379" + config.concurrency = 5 + config.locking_duration = 2 # how long you want to wait for the lock to be released, in seconds + config.polling_interval = 0.3 # how long the polling interval should be, in seconds +end +``` + ### Cleaning up afterwards -Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment). +Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment). ```ruby # config/initializers/gush.rb @@ -361,7 +375,7 @@ Gush.configure do |config| end ``` -And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values. +And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values. ### Avoid overlapping workflows From 4d14ac9722484d84eb9a267fb42908cf301c54fb Mon Sep 17 00:00:00 2001 From: Kimmie Date: Sun, 13 Oct 2019 17:11:55 +0700 Subject: [PATCH 4/4] Add locking options to Gush::CLI --- lib/gush/cli.rb | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/gush/cli.rb b/lib/gush/cli.rb index 9bf16cf..f542897 100644 --- a/lib/gush/cli.rb +++ b/lib/gush/cli.rb @@ -12,11 +12,13 @@ class CLI < Thor def initialize(*) super Gush.configure do |config| - config.gushfile = options.fetch("gushfile", config.gushfile) - config.concurrency = options.fetch("concurrency", config.concurrency) - config.redis_url = options.fetch("redis", config.redis_url) - config.namespace = options.fetch("namespace", config.namespace) - config.ttl = options.fetch("ttl", config.ttl) + config.gushfile = options.fetch("gushfile", config.gushfile) + config.concurrency = options.fetch("concurrency", config.concurrency) + config.redis_url = options.fetch("redis", config.redis_url) + config.namespace = options.fetch("namespace", config.namespace) + config.ttl = options.fetch("ttl", config.ttl) + config.locking_duration = options.fetch("locking_duration", config.locking_duration) + config.polling_interval = options.fetch("polling_interval", config.polling_interval) end load_gushfile end