Skip to content

Commit 1096df5

Browse files
committed
Remove sidekiq/api dependency from scheduled, run cleanup on scheduler start, #5513
1 parent 6fc6666 commit 1096df5

File tree

2 files changed

+47
-9
lines changed

2 files changed

+47
-9
lines changed

lib/sidekiq/api.rb

+14-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@
1111
require "sidekiq/metrics/query"
1212
end
1313

14+
#
15+
# Sidekiq's Data API provides a Ruby object model on top
16+
# of Sidekiq's runtime data in Redis. This API should never
17+
# be used within application code for business logic.
18+
#
19+
# The Sidekiq server process never uses this API: all data
20+
# manipulation is done directly for performance reasons to
21+
# ensure we are using Redis as efficiently as possible at
22+
# every callsite.
23+
#
24+
1425
module Sidekiq
1526
# Retrieve runtime statistics from Redis regarding
1627
# this Sidekiq cluster.
@@ -893,10 +904,12 @@ def initialize(clean_plz = true)
893904
# :nodoc:
894905
# @api private
895906
def cleanup
907+
# dont run cleanup more than once per minute
896908
return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
909+
897910
count = 0
898911
Sidekiq.redis do |conn|
899-
procs = conn.sscan_each("processes").to_a.sort
912+
procs = conn.sscan_each("processes").to_a
900913
heartbeats = conn.pipelined { |pipeline|
901914
procs.each do |key|
902915
pipeline.hget(key, "info")

lib/sidekiq/scheduled.rb

+33-8
Original file line numberDiff line numberDiff line change
@@ -190,22 +190,47 @@ def process_count
190190
pcount
191191
end
192192

193-
def initial_wait
194-
# periodically clean out the `processes` set in Redis which can collect
195-
# references to dead processes over time. The process count affects how
196-
# often we scan for scheduled jobs.
197-
ps = Sidekiq::ProcessSet.new(false)
198-
ps.cleanup if rand(1000) % 10 == 0 # only cleanup 10% of the time
193+
# A copy of Sidekiq::ProcessSet#cleanup because server
194+
# should never depend on sidekiq/api.
195+
def cleanup
196+
# dont run cleanup more than once per minute
197+
return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
199198

200-
# Have all processes sleep between 5-15 seconds. 10 seconds
201-
# to give time for the heartbeat to register (if the poll interval is going to be calculated by the number
199+
count = 0
200+
Sidekiq.redis do |conn|
201+
procs = conn.sscan_each("processes").to_a
202+
heartbeats = conn.pipelined { |pipeline|
203+
procs.each do |key|
204+
pipeline.hget(key, "info")
205+
end
206+
}
207+
208+
# the hash named key has an expiry of 60 seconds.
209+
# if it's not found, that means the process has not reported
210+
# in to Redis and probably died.
211+
to_prune = procs.select.with_index { |proc, i|
212+
heartbeats[i].nil?
213+
}
214+
count = conn.srem("processes", to_prune) unless to_prune.empty?
215+
end
216+
count
217+
end
218+
219+
def initial_wait
220+
# Have all processes sleep between 5-15 seconds. 10 seconds to give time for
221+
# the heartbeat to register (if the poll interval is going to be calculated by the number
202222
# of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
203223
total = 0
204224
total += INITIAL_WAIT unless @config[:poll_interval_average]
205225
total += (5 * rand)
206226

207227
@sleeper.pop(total)
208228
rescue Timeout::Error
229+
ensure
230+
# periodically clean out the `processes` set in Redis which can collect
231+
# references to dead processes over time. The process count affects how
232+
# often we scan for scheduled jobs.
233+
cleanup
209234
end
210235
end
211236
end

0 commit comments

Comments
 (0)