Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def start(force: false)
@notifier = GoodJob::Notifier.new(enable_listening: @configuration.enable_listen_notify, executor: @shared_executor.executor)
@poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(@configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@multi_scheduler, :create_thread]
@poller.recipients << [@multi_scheduler, :create_thread]
@notifier.recipients.push([@multi_scheduler, :create_thread])
@poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) })

@cron_manager = GoodJob::CronManager.new(@configuration.cron_entries, start_on_initialize: true, executor: @shared_executor.executor) if @configuration.enable_cron?

Expand Down
2 changes: 2 additions & 0 deletions lib/good_job/job_performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def name
end

# Perform the next eligible job
# @yield [Execution] Yields the execution, if one is dequeued
# @return [Object, nil] Returns job result or +nil+ if no job was found
def next
active_job_id = nil
Expand All @@ -36,6 +37,7 @@ def next
active_job_id = execution.active_job_id
performing_active_job_ids << active_job_id
@metrics.touch_execution_at
yield(execution) if block_given?
else
@metrics.increment_empty_executions
end
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def restart(timeout: -1)
def create_thread(state = nil)
results = []

if state
if state && !state[:fanout]
schedulers.any? do |scheduler|
scheduler.create_thread(state).tap { |result| results << result }
end
Expand Down
16 changes: 10 additions & 6 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,17 @@ def restart(timeout: -1)
# Wakes a thread to allow the performer to execute a task.
# @param state [Hash, nil] Contextual information for the performer. See {JobPerformer#next?}.
# @return [Boolean, nil] Whether work was started.
#
# * +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.
# * +true+ if the performer started executing work.
# * +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
def create_thread(state = nil)
return nil unless executor.running?

if state
if state.present?
return false unless performer.next?(state)

fanout = state&.fetch(:fanout, nil)

if state[:count]
# When given state for multiple jobs, try to create a thread for each one.
# Return true if a thread can be created for all of them, nil if partial or none.
Expand Down Expand Up @@ -164,7 +165,7 @@ def create_thread(state = nil)
return nil unless remaining_cache_count.positive?
end

create_task(delay)
create_task(delay, fanout: fanout)

run_now ? true : nil
end
Expand Down Expand Up @@ -262,12 +263,15 @@ def create_executor
end

# @param delay [Integer]
# @param fanout [Boolean] Whether to eagerly create a 2nd execution thread if a job is found.
# @return [void]
def create_task(delay = 0)
future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Rails.application.reloader.wrap do
thr_performer.next
thr_performer.next do |found|
thr_scheduler.create_thread({ fanout: fanout }) if found && fanout
end
end
end
future.add_observer(self, :task_observer)
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/capsule_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
it 'passes the job state to the scheduler' do
scheduler = instance_double(GoodJob::Scheduler, create_thread: nil, shutdown?: true, shutdown: nil)
allow(GoodJob::Scheduler).to receive(:new).and_return(scheduler)
job_state = "STATE"
job_state = { animal: "cat" }

capsule = described_class.new
capsule.start
Expand Down
20 changes: 10 additions & 10 deletions spec/lib/good_job/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@

describe 'polling' do
it 'is instrumented' do
stub_const 'POLL_COUNT', Concurrent::AtomicFixnum.new(0)
latch = Concurrent::CountDownLatch.new(3)

payloads = []
callback = proc { |*args| payloads << args }

ActiveSupport::Notifications.subscribed(callback, "finished_timer_task") do
recipient = proc { |_payload| POLL_COUNT.increment }
poller = described_class.new(recipient, poll_interval: 1)
sleep_until(max: 5, increments_of: 0.5) { POLL_COUNT.value > 1 }
recipient = proc { |_payload| latch.count_down }
poller = described_class.new(recipient, poll_interval: 0.1)
latch.wait(10)
poller.shutdown
end

Expand All @@ -42,14 +42,14 @@

describe '#recipients' do
it 'polls recipients method' do
stub_const 'POLL_COUNT', Concurrent::AtomicFixnum.new(0)
recipient = proc { |_payload| POLL_COUNT.increment }
latch = Concurrent::CountDownLatch.new(3)

poller = described_class.new(recipient, poll_interval: 1)
sleep_until(max: 5, increments_of: 0.5) { POLL_COUNT.value > 2 }
poller.shutdown
recipient = proc { |_payload| latch.count_down }
poller = described_class.new(recipient, poll_interval: 0.1)

expect(latch.wait(10)).to eq true

expect(POLL_COUNT.value).to be > 2
poller.shutdown
end
end
end
33 changes: 29 additions & 4 deletions spec/lib/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,18 @@
# create a thread, which causes this test to flake on JRuby.
it 'returns false if there are no threads available', :skip_if_java do
scheduler = described_class.new(GoodJob::JobPerformer.new('mice'), max_threads: 1)
scheduler.create_thread(queue_name: 'mice')
expect(scheduler.create_thread(queue_name: 'mice')).to be_nil
scheduler.create_thread({ queue_name: 'mice' })
expect(scheduler.create_thread({ queue_name: 'mice' })).to be_nil
end

it 'returns true if the state matches the performer' do
scheduler = described_class.new(GoodJob::JobPerformer.new('mice'), max_threads: 2)
expect(scheduler.create_thread(queue_name: 'mice')).to be true
expect(scheduler.create_thread({ queue_name: 'mice' })).to be true
end

it 'returns false if the state does not match the performer' do
scheduler = described_class.new(GoodJob::JobPerformer.new('mice'), max_threads: 2)
expect(scheduler.create_thread(queue_name: 'elephant')).to be false
expect(scheduler.create_thread({ queue_name: 'elephant' })).to be false
end

it 'uses state[:count] to create multiple threads' do
Expand All @@ -171,6 +171,31 @@
expect(result).to be true
expect(scheduler).to have_received(:create_task).exactly(10).times
end

it 'uses fanout:true to eagerly create threads', :skip_if_java do
job_performer = GoodJob::JobPerformer.new("*")

# Engage all of the threads, then hold them until assertions are finished
barrier = Concurrent::CyclicBarrier.new(4)
finish_event = Concurrent::Event.new

allow(job_performer).to receive(:next_at).and_return([])
allow(job_performer).to receive(:next) do |&block|
next if finish_event.set?

block.call(true)
barrier.wait(10)
finish_event.wait(10)
end

scheduler = described_class.new(job_performer, max_threads: 3)
scheduler.create_thread({ fanout: true })

expect(barrier.wait(10)).to eq true

finish_event.set
scheduler.shutdown
end
end

describe '#stats' do
Expand Down