From 5baa63102ea99cb349ac4366c6cd2024f2489dfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Thu, 29 Feb 2024 13:53:43 -0300 Subject: [PATCH 1/8] Rename variable and move definitions to outer scope --- Change-type: test --- spec/sidekiq-queue-pause_spec.rb | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index 79e7fd4..b80a4c6 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -2,30 +2,31 @@ describe Sidekiq::QueuePause do describe Sidekiq::QueuePause::PausingFetch do - describe "#unpause_queues_cmd" do - let(:queuename) { "some_queue" } - let(:config) { {queues: [queuename], strict: true} } - let(:pausing_fetch) { described_class.new(config) } + let(:queue_name) { "some_queue" } + let(:config) { {queues: [queue_name], strict: true} } + + subject(:pausing_fetch) { described_class.new(config) } + describe "#unpause_queues_cmd" do context "with Sidekiq > 6.5.6 the queues list can contain Hashes" do - let(:queue_list) { ["queue:#{queuename}", {timeout: 2}] } + let(:queue_list) { ["queue:#{queue_name}", {timeout: 2}] } before { allow(pausing_fetch).to receive(:queues_cmd).and_return(queue_list) } it "does not checked whether the Hash is paused" do - expect(Sidekiq::QueuePause).to receive(:paused?).with(queuename, Sidekiq::QueuePause.process_key).and_return(false) + expect(Sidekiq::QueuePause).to receive(:paused?).with(queue_name, Sidekiq::QueuePause.process_key).and_return(false) expect(pausing_fetch.unpaused_queues_cmd).to match_array(queue_list) end end context "with Sidekiq < 6.5.6 the queues list can contain an Integer" do - let(:queue_list) { ["queue:#{queuename}", 2] } + let(:queue_list) { ["queue:#{queue_name}", 2] } before { allow(pausing_fetch).to receive(:queues_cmd).and_return(queue_list) } it "does not check whether the Integer is paused" do - expect(Sidekiq::QueuePause).to receive(:paused?).with(queuename, Sidekiq::QueuePause.process_key).and_return(false) + expect(Sidekiq::QueuePause).to receive(:paused?).with(queue_name, Sidekiq::QueuePause.process_key).and_return(false) expect(pausing_fetch.unpaused_queues_cmd).to match_array(queue_list) end From eed2f7a5379ec8ded0318058224d8c2b9702b62c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:16:40 -0300 Subject: [PATCH 2/8] Fix `nil` pointer when requeue a unit of work Since https://github.com/sidekiq/sidekiq/commit/67daa7a4 the `UnitOfWork` changed its interface to include `config`. The `config` is not returned from the `brpop` method, and therefore creating a `UnitOfWork` with `*work` passed `config` as `nil`. This is causing `nil` pointers on requeue because the method uses `config.redis` to perform the `rpush`. This change not only fixes that as brings the latest code from `sidekiq` that also deals with some timeouts (not tested in this change). Except from the timeout part, the rest is tested here. --- Change-type: fix See-also: https://github.com/sidekiq/sidekiq/commit/67daa7a4 See-also: https://github.com/sidekiq/sidekiq/blob/main/lib/sidekiq/fetch.rb#L47 --- lib/sidekiq-queue-pause.rb | 4 ++-- spec/sidekiq-queue-pause_spec.rb | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq-queue-pause.rb b/lib/sidekiq-queue-pause.rb index 56be63c..05d84b9 100644 --- a/lib/sidekiq-queue-pause.rb +++ b/lib/sidekiq-queue-pause.rb @@ -53,8 +53,8 @@ def retrieve_work end def retrieve_work_for_queues(qcmd) - work = Sidekiq.redis { |conn| conn.brpop(*qcmd) } - UnitOfWork.new(*work) if work + queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } + UnitOfWork.new(queue, job, config) if queue end # Returns the list of unpause queue names. diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index b80a4c6..1275e87 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -32,5 +32,25 @@ end end end + + describe "reenqueueing a unit of work" do + let(:conn) { double("redis connection", read_timeout: 5, blocking_call: queue_and_work, brpop: queue_and_work) } + let(:job) { {queue: "some_queue", retry: true} } + let(:queue) { "queue:#{queue_name}" } + let(:queue_and_work) { [queue, job.to_json] } + + it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do + allow(config).to receive(:redis).and_yield(conn) + allow(pausing_fetch).to receive(:redis).and_yield(conn) + + expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original + expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) + expect(conn).to receive(:rpush).with(queue, job.to_json) + + unit_of_work = pausing_fetch.retrieve_work_for_queues(queue) + + expect { unit_of_work.requeue }.to_not raise_error + end + end end end From 57fd90eadc02d9d807133c7021d0e9e6fa20c39c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:25:34 -0300 Subject: [PATCH 3/8] Revert the blocking call change This caused the connection to hang without returning any work. Letting this one for later. --- Change-type: feature --- lib/sidekiq-queue-pause.rb | 3 ++- spec/sidekiq-queue-pause_spec.rb | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq-queue-pause.rb b/lib/sidekiq-queue-pause.rb index 05d84b9..135e6dd 100644 --- a/lib/sidekiq-queue-pause.rb +++ b/lib/sidekiq-queue-pause.rb @@ -53,7 +53,8 @@ def retrieve_work end def retrieve_work_for_queues(qcmd) - queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } + #queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } + queue, job = redis { |conn| conn.brpop(*qcmd) } UnitOfWork.new(queue, job, config) if queue end diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index 1275e87..5aac350 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -44,7 +44,8 @@ allow(pausing_fetch).to receive(:redis).and_yield(conn) expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original - expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) + #expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) + expect(conn).to receive(:brpop).with(queue) expect(conn).to receive(:rpush).with(queue, job.to_json) unit_of_work = pausing_fetch.retrieve_work_for_queues(queue) From d1a7ab1f8cbd57a2d5b477a1cdb6435f7479d3c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Thu, 29 Feb 2024 18:43:46 -0300 Subject: [PATCH 4/8] Call redis from Sidekiq --- Change-type: fix --- lib/sidekiq-queue-pause.rb | 2 +- spec/sidekiq-queue-pause_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq-queue-pause.rb b/lib/sidekiq-queue-pause.rb index 135e6dd..11f1131 100644 --- a/lib/sidekiq-queue-pause.rb +++ b/lib/sidekiq-queue-pause.rb @@ -54,7 +54,7 @@ def retrieve_work def retrieve_work_for_queues(qcmd) #queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } - queue, job = redis { |conn| conn.brpop(*qcmd) } + queue, job = Sidekiq.redis { |conn| conn.brpop(*qcmd) } UnitOfWork.new(queue, job, config) if queue end diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index 5aac350..3e32a52 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -41,7 +41,7 @@ it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do allow(config).to receive(:redis).and_yield(conn) - allow(pausing_fetch).to receive(:redis).and_yield(conn) + allow(Sidekiq).to receive(:redis).and_yield(conn) expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original #expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) From b74364c215145c7af6a1d0b5e9278e1bc4bd07d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Fri, 1 Mar 2024 09:16:42 -0300 Subject: [PATCH 5/8] Inject Sidekiq as config --- Change-type: feature --- lib/sidekiq-queue-pause.rb | 2 +- spec/sidekiq-queue-pause_spec.rb | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/sidekiq-queue-pause.rb b/lib/sidekiq-queue-pause.rb index 11f1131..c98c818 100644 --- a/lib/sidekiq-queue-pause.rb +++ b/lib/sidekiq-queue-pause.rb @@ -55,7 +55,7 @@ def retrieve_work def retrieve_work_for_queues(qcmd) #queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } queue, job = Sidekiq.redis { |conn| conn.brpop(*qcmd) } - UnitOfWork.new(queue, job, config) if queue + UnitOfWork.new(queue, job, Sidekiq) if queue end # Returns the list of unpause queue names. diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index 3e32a52..ffa46a0 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -40,10 +40,9 @@ let(:queue_and_work) { [queue, job.to_json] } it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do - allow(config).to receive(:redis).and_yield(conn) allow(Sidekiq).to receive(:redis).and_yield(conn) - expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original + expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, Sidekiq).and_call_original #expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) expect(conn).to receive(:brpop).with(queue) expect(conn).to receive(:rpush).with(queue, job.to_json) From c07c8c91f890508a39864966df81f0c1025ed2f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Mon, 4 Mar 2024 13:46:49 -0300 Subject: [PATCH 6/8] Make sure config is an instance of config and not a Hash --- Change-type: fix --- README.md | 2 +- spec/sidekiq-queue-pause_spec.rb | 25 ++++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9bddb80..0190e55 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Initializer: ```ruby Sidekiq.configure_server do |config| - Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch.new(Sidekiq.options) + Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch.new(Sidekiq) # Optionally, you may set some unique key identifying the # Sidekiq process you want to control. This (server) process will diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index ffa46a0..e735298 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -3,10 +3,29 @@ describe Sidekiq::QueuePause do describe Sidekiq::QueuePause::PausingFetch do let(:queue_name) { "some_queue" } - let(:config) { {queues: [queue_name], strict: true} } + let(:logger) { double("logger") } + let(:job) { {queue: "some_queue", retry: true} } + let(:queue) { "queue:#{queue_name}" } + let(:queue_and_work) { [queue, job.to_json] } + let(:conn) { double("redis connection", read_timeout: 5, blocking_call: queue_and_work, brpop: queue_and_work) } + let(:config) { OpenStruct.new(queues: [queue_name], strict: true, logger: logger, redis: conn) } subject(:pausing_fetch) { described_class.new(config) } + describe "instance methods from Component" do + it "responds to `logger`" do + expect(pausing_fetch).to respond_to(:logger) + end + + it "responds to `redis`" do + expect(pausing_fetch).to respond_to(:redis) + end + + it "config is not a `#{Hash}`" do + expect(pausing_fetch.config).not_to be_a(Hash) + end + end + describe "#unpause_queues_cmd" do context "with Sidekiq > 6.5.6 the queues list can contain Hashes" do let(:queue_list) { ["queue:#{queue_name}", {timeout: 2}] } @@ -34,10 +53,6 @@ end describe "reenqueueing a unit of work" do - let(:conn) { double("redis connection", read_timeout: 5, blocking_call: queue_and_work, brpop: queue_and_work) } - let(:job) { {queue: "some_queue", retry: true} } - let(:queue) { "queue:#{queue_name}" } - let(:queue_and_work) { [queue, job.to_json] } it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do allow(Sidekiq).to receive(:redis).and_yield(conn) From d2e9322e955bce4bfef8227a08c3d16cb43ebc05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Mon, 4 Mar 2024 13:52:20 -0300 Subject: [PATCH 7/8] Access `redis` and `config` directly from the Component module --- Change-type: feature --- lib/sidekiq-queue-pause.rb | 4 ++-- spec/sidekiq-queue-pause_spec.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/sidekiq-queue-pause.rb b/lib/sidekiq-queue-pause.rb index c98c818..135e6dd 100644 --- a/lib/sidekiq-queue-pause.rb +++ b/lib/sidekiq-queue-pause.rb @@ -54,8 +54,8 @@ def retrieve_work def retrieve_work_for_queues(qcmd) #queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } - queue, job = Sidekiq.redis { |conn| conn.brpop(*qcmd) } - UnitOfWork.new(queue, job, Sidekiq) if queue + queue, job = redis { |conn| conn.brpop(*qcmd) } + UnitOfWork.new(queue, job, config) if queue end # Returns the list of unpause queue names. diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index e735298..52afa91 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -55,9 +55,9 @@ describe "reenqueueing a unit of work" do it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do - allow(Sidekiq).to receive(:redis).and_yield(conn) + expect(config).to receive(:redis).and_yield(conn).twice #one for fetch, one for requeue - expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, Sidekiq).and_call_original + expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original #expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) expect(conn).to receive(:brpop).with(queue) expect(conn).to receive(:rpush).with(queue, job.to_json) From 8196e130c194033b6e41d7d913c68f2c7f8c431b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Vortmann?= <57185+jvortmann@users.noreply.github.com> Date: Thu, 29 Feb 2024 14:17:56 -0300 Subject: [PATCH 8/8] Bump version to `v0.1.2` and update some dependencies --- Change-type: deps --- Gemfile.lock | 15 +++++++++------ sidekiq-queue-pause.gemspec | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 55ae151..2abf927 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - sidekiq-queue-pause (0.1.0) + sidekiq-queue-pause (0.1.2) sidekiq (>= 6.0, < 7.0) GEM @@ -13,7 +13,7 @@ GEM backport (1.2.0) benchmark (0.2.1) coderay (1.1.3) - connection_pool (2.3.0) + connection_pool (2.4.1) diff-lcs (1.5.0) docile (1.4.0) e2mmap (0.1.0) @@ -56,9 +56,11 @@ GEM lumberjack (1.2.8) method_source (1.0.0) mini_mime (1.1.2) + mini_portile2 (2.8.5) multi_xml (0.6.0) nenv (0.3.0) - nokogiri (1.14.0-x86_64-linux) + nokogiri (1.14.0) + mini_portile2 (~> 2.8.0) racc (~> 1.4) notiffany (0.1.3) nenv (~> 0.1) @@ -89,12 +91,12 @@ GEM method_source (~> 1.0) public_suffix (5.0.1) racc (1.6.2) - rack (2.2.6.2) + rack (2.2.8.1) rainbow (3.1.1) rb-fsevent (0.11.2) rb-inotify (0.10.1) ffi (~> 1.0) - redis (4.8.0) + redis (4.8.1) regexp_parser (2.6.2) reverse_markdown (2.1.1) nokogiri @@ -134,7 +136,7 @@ GEM addressable (>= 2.3.5) faraday (>= 0.17.3, < 3) shellany (0.0.1) - sidekiq (6.5.8) + sidekiq (6.5.12) connection_pool (>= 2.2.5, < 3) rack (~> 2.0) redis (>= 4.5.0, < 5) @@ -176,6 +178,7 @@ GEM webrick (~> 1.7.0) PLATFORMS + x86_64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/sidekiq-queue-pause.gemspec b/sidekiq-queue-pause.gemspec index e1946e7..20d7b5c 100644 --- a/sidekiq-queue-pause.gemspec +++ b/sidekiq-queue-pause.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = "sidekiq-queue-pause" - s.version = "0.1.1" + s.version = "0.1.2" s.summary = "Pause a Sidekiq queue" s.description = "Let's you pause/unpause individual sidekiq queues." s.license = "MIT"