Skip to content

Commit

Permalink
Separate recurring tasks configuration into their own file
Browse files Browse the repository at this point in the history
`config/recurring.yml`. Run them using the least busy dispatcher
(largest polling interval) or a new default dispatcher if none.

This also introduces some new options for the CLI, to allow for
`--dispatch_only` and `--work_only`, and to skip recurring tasks.
  • Loading branch information
rosa committed Sep 9, 2024
1 parent d7955da commit 0aa1644
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 72 deletions.
16 changes: 14 additions & 2 deletions lib/solid_queue/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@

module SolidQueue
class Cli < Thor
class_option :config_file, type: :string, aliases: "-c", default: Configuration::DEFAULT_CONFIG_FILE_PATH, desc: "Path to config file"
class_option :config_file, type: :string, aliases: "-c",
default: Configuration::DEFAULT_CONFIG_FILE_PATH,
desc: "Path to config file",
banner: "SOLID_QUEUE_CONFIG"

class_option :recurring_schedule_file, type: :string,
default: Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH,
desc: "Path to recurring schedule definition",
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"

class_option :dispatch_only, type: :boolean, default: false
class_option :work_only, type: :boolean, default: false
class_option :skip_recurring, type: :boolean, default: false

def self.exit_on_failure?
true
Expand All @@ -14,7 +26,7 @@ def self.exit_on_failure?
default_command :start

def start
SolidQueue::Supervisor.start(config_file: options["config_file"])
SolidQueue::Supervisor.start(**options.symbolize_keys)
end
end
end
121 changes: 78 additions & 43 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ def instantiate
batch_size: 500,
polling_interval: 1,
concurrency_maintenance: true,
concurrency_maintenance_interval: 600,
recurring_tasks: []
concurrency_maintenance_interval: 600
}

DEFAULT_CONFIG = {
workers: [ WORKER_DEFAULTS ],
dispatchers: [ DISPATCHER_DEFAULTS ]
}
DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"

def initialize(config_file: nil, **options)
@raw_config = config_from(config_file || options.presence)
def initialize(**options)
@options = options.with_defaults(default_options)
end

def configured_processes
dispatchers + workers
case
when only_work? then workers
when only_dispatch? then dispatchers
else
dispatchers + workers
end
end

def max_number_of_threads
Expand All @@ -42,9 +44,29 @@ def max_number_of_threads
end

private
attr_reader :raw_config
attr_reader :options

def default_options
{
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
only_work: false,
only_dispatch: false,
skip_recurring: false
}
end

def only_work?
options[:only_work]
end

DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"
def only_dispatch?
options[:only_dispatch]
end

def skip_recurring_tasks?
options[:skip_recurring] || only_work?
end

def workers
workers_options.flat_map do |worker_options|
Expand All @@ -55,71 +77,84 @@ def workers

def dispatchers
dispatchers_options.map do |dispatcher_options|
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
Process.new :dispatcher, dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
end
end

def config_from(file_or_hash, env: Rails.env)
load_config_from(file_or_hash).then do |config|
config = config[env.to_sym] ? config[env.to_sym] : config
if (config.keys & DEFAULT_CONFIG.keys).any? then config
else
DEFAULT_CONFIG
end
Process.new :dispatcher, dispatcher_options.with_defaults(DISPATCHER_DEFAULTS)
end
end

def workers_options
@workers_options ||= options_from_raw_config(:workers)
@workers_options ||= processes_config.fetch(:workers, [])
.map { |options| options.dup.symbolize_keys }
end

def dispatchers_options
@dispatchers_options ||= options_from_raw_config(:dispatchers)
@dispatchers_options ||= processes_config.fetch(:dispatchers, [])
.map { |options| options.dup.symbolize_keys }
.then { |options| with_recurring_tasks(options) }
end

def options_from_raw_config(key)
Array(raw_config[key])
def with_recurring_tasks(options)
if !skip_recurring_tasks? && recurring_tasks.any?
options.sort_by! { |attrs| attrs[:polling_interval] }

if least_busy_dispatcher = options.pop
least_busy_dispatcher[:recurring_tasks] = recurring_tasks
options.push(least_busy_dispatcher)
else
[ DISPATCHER_DEFAULTS.merge(recurring_tasks: recurring_tasks) ]
end
else
options
end
end

def parse_recurring_tasks(tasks)
Array(tasks).map do |id, options|
def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options)
end.select(&:valid?)
end

def processes_config
@processes_config ||= config_from \
options.slice(:workers, :dispatchers).presence || options[:config_file],
keys: [ :workers, :dispatchers ],
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
end

def recurring_tasks_config
@recurring_tasks ||= config_from options[:recurring_schedule_file]
end


def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env)
load_config_from(file_or_hash).then do |config|
config = config[env.to_sym] ? config[env.to_sym] : config
config = config.slice(*keys) if keys.any?

if config.empty? then fallback
else
config
end
end
end

def load_config_from(file_or_hash)
case file_or_hash
when Hash
file_or_hash.dup
when Pathname, String
load_config_from_file Pathname.new(file_or_hash)
when NilClass
load_config_from_env_location || load_config_from_default_location
{}
else
raise "Solid Queue cannot be initialized with #{file_or_hash.inspect}"
end
end

def load_config_from_env_location
if ENV["SOLID_QUEUE_CONFIG"].present?
load_config_from_file Rails.root.join(ENV["SOLID_QUEUE_CONFIG"])
end
end

def load_config_from_default_location
Rails.root.join(DEFAULT_CONFIG_FILE_PATH).then do |config_file|
config_file.exist? ? load_config_from_file(config_file) : {}
end
end

def load_config_from_file(file)
if file.exist?
ActiveSupport::ConfigurationFile.parse(file).deep_symbolize_keys
else
raise "Configuration file for Solid Queue not found in #{file}"
{}
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions test/dummy/config/alternative_configuration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
default: &default
workers:
<% 3.times do |i| %>
- queues: queue_<%= i + 1 %>
threads: <%= i + 1 %>
<% end %>

development:
<<: *default

test:
<<: *default
7 changes: 7 additions & 0 deletions test/dummy/config/empty_configuration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
default: &default

development:
<<: *default

test:
<<: *default
1 change: 1 addition & 0 deletions test/dummy/config/invalid_configuration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
random_wrong_key: random_value
4 changes: 4 additions & 0 deletions test/dummy/config/recurring.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
periodic_store_result:
class: StoreResultJob
args: [ 42, { status: "custom_status" } ]
schedule: every second
5 changes: 0 additions & 5 deletions test/dummy/config/solid_queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ default: &default
dispatchers:
- polling_interval: 1
batch_size: 500
recurring_tasks:
periodic_store_result:
class: StoreResultJob
args: [ 42, { status: "custom_status" } ]
schedule: every second

development:
<<: *default
Expand Down
3 changes: 1 addition & 2 deletions test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase
self.use_transactional_tests = false

setup do
config_as_hash = { workers: [ { queues: :background }, { queues: :default, threads: 5 } ], dispatchers: [] }
@pid = run_supervisor_as_fork(config_as_hash)
@pid = run_supervisor_as_fork(workers: [ { queues: :background }, { queues: :default, threads: 5 } ])

wait_for_registered_processes(3, timeout: 3.second)
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
Expand Down
69 changes: 59 additions & 10 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@

class ConfigurationTest < ActiveSupport::TestCase
test "default configuration to process all queues and dispatch" do
configuration = stub_const(SolidQueue::Configuration, :DEFAULT_CONFIG_FILE_PATH, "non/existent/path") do
SolidQueue::Configuration.new
end
configuration = SolidQueue::Configuration.new(config_file: nil)

assert_equal 2, configuration.configured_processes.count
assert_processes configuration, :worker, 1, queues: "*"
assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size]
end

test "default configuration when config given doesn't include any configuration" do
configuration = SolidQueue::Configuration.new(random_wrong_key: :random_value)
configuration = SolidQueue::Configuration.new(config_file: config_file_path(:invalid_configuration))

assert_equal 2, configuration.configured_processes.count
assert_processes configuration, :worker, 1, queues: "*"
assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size]
end

test "default configuration when config given is empty" do
configuration = SolidQueue::Configuration.new(config_file: Rails.root.join("config/empty_configuration.yml"))
configuration = SolidQueue::Configuration.new(config_file: config_file_path(:empty_configuration))

assert_equal 2, configuration.configured_processes.count
assert_processes configuration, :worker, 1, queues: "*"
Expand All @@ -35,7 +33,7 @@ class ConfigurationTest < ActiveSupport::TestCase
end

test "read configuration from provided file" do
configuration = SolidQueue::Configuration.new(config_file: Rails.root.join("config/alternative_configuration.yml"))
configuration = SolidQueue::Configuration.new(config_file: config_file_path(:alternative_configuration), only_work: true)

assert 3, configuration.configured_processes.count
assert_processes configuration, :worker, 3, processes: 1, polling_interval: 0.1, queues: %w[ queue_1 queue_2 queue_3 ], threads: [ 1, 2, 3 ]
Expand All @@ -49,7 +47,7 @@ class ConfigurationTest < ActiveSupport::TestCase
assert_processes configuration, :dispatcher, 1, polling_interval: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], batch_size: 100
assert_processes configuration, :worker, 2, queues: "background", polling_interval: 10

configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ])
configuration = SolidQueue::Configuration.new(workers: [ background_worker, background_worker ], skip_recurring: true)

assert_processes configuration, :dispatcher, 0
assert_processes configuration, :worker, 2
Expand All @@ -64,22 +62,73 @@ class ConfigurationTest < ActiveSupport::TestCase
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
configuration = SolidQueue::Configuration.new(workers: [ background_worker ])

assert_equal 3, configuration.configured_processes.count
assert_processes configuration, :worker, 3, queues: "background", polling_interval: 10
end

test "recurring tasks configuration with one dispatcher" do
configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ])

assert_processes configuration, :dispatcher, 1, polling_interval: 0.1

dispatcher = configuration.configured_processes.first.instantiate
assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second"
end

test "recurring tasks configuration with no dispatchers uses a default dispatcher" do
configuration = SolidQueue::Configuration.new(dispatchers: [])

assert_processes configuration, :dispatcher, 1, polling_interval: 1

dispatcher = configuration.configured_processes.first.instantiate
assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second"
end

test "recurring tasks configuration with multiple dispatchers uses the least busy one" do
configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 }, { polling_interval: 0.4 }, { polling_interval: 0.2 } ])

assert_processes configuration, :dispatcher, 3, polling_interval: [ 0.1, 0.2, 0.4 ] # sorted by polling interval

dispatcher = configuration.configured_processes.last.instantiate
assert_has_recurring_task dispatcher, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second"

dispatchers_without_recurring_tasks = configuration.configured_processes.first(2)
assert_nil dispatchers_without_recurring_tasks.map { |d| d.attributes[:recurring_tasks] }.uniq.first
end

test "no recurring tasks configuration when explicitly excluded" do
configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ], skip_recurring: true)
assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil
end

private
def assert_processes(configuration, kind, count, **attributes)
processes = configuration.configured_processes.select { |p| p.kind == kind }
assert_equal count, processes.size

attributes.each do |attr, expected_value|
value = processes.map { |p| p.attributes.fetch(attr) }
value = processes.map { |p| p.attributes[attr] }
unless expected_value.is_a?(Array)
value = value.first
end

assert_equal expected_value, value
if expected_value.nil?
assert_nil value
else
assert_equal expected_value, value
end
end
end

def assert_has_recurring_task(dispatcher, key:, **attributes)
assert_equal 1, dispatcher.recurring_schedule.configured_tasks.count
task = dispatcher.recurring_schedule.configured_tasks.detect { |t| t.key == key }

attributes.each do |attr, value|
assert_equal value, task.public_send(attr)
end
end

def config_file_path(name)
Rails.root.join("config/#{name}.yml")
end
end
Loading

0 comments on commit 0aa1644

Please sign in to comment.