From d8eeabc741b9f4f6f4b6f356649523027d683f87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A9=20Dupuis?= Date: Mon, 7 Oct 2024 01:00:33 -0700 Subject: [PATCH 1/2] Enable strict loading in solid_queue's models --- app/models/solid_queue/blocked_execution.rb | 2 +- app/models/solid_queue/claimed_execution.rb | 9 +++++--- app/models/solid_queue/execution.rb | 7 ++++-- app/models/solid_queue/failed_execution.rb | 3 ++- app/models/solid_queue/job.rb | 2 +- .../solid_queue/job/concurrency_controls.rb | 6 ++++- app/models/solid_queue/job/executable.rb | 11 ++++++--- app/models/solid_queue/job/recurrable.rb | 8 ++++++- app/models/solid_queue/job/retryable.rb | 8 ++++++- app/models/solid_queue/job/schedulable.rb | 6 ++++- app/models/solid_queue/process.rb | 6 ++--- test/dummy/config/application.rb | 2 +- test/integration/instrumentation_test.rb | 12 +++++----- test/integration/jobs_lifecycle_test.rb | 2 +- test/integration/recurring_tasks_test.rb | 4 +++- .../solid_queue/claimed_execution_test.rb | 2 +- .../solid_queue/failed_execution_test.rb | 2 +- test/models/solid_queue/job_test.rb | 23 ++++++++++--------- test/test_helper.rb | 2 +- test/test_helpers/jobs_test_helper.rb | 4 +++- test/test_helpers/processes_test_helper.rb | 4 ++-- 21 files changed, 81 insertions(+), 44 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index b596b6de..6e006f5b 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -26,7 +26,7 @@ def release_many(concurrency_keys) def release_one(concurrency_key) transaction do - if execution = ordered.where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.first + if execution = ordered.includes(:job).where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.first execution.release end end diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 11aba177..02e76b43 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true class SolidQueue::ClaimedExecution < SolidQueue::Execution - belongs_to :process, strict_loading: false + belongs_to :process scope :orphaned, -> { where.missing(:process) } @@ -17,7 +17,7 @@ def claiming(job_ids, process_id, &block) SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload| insert_all!(job_data) - where(job_id: job_ids, process_id: process_id).load.tap do |claimed| + where(job_id: job_ids, process_id: process_id).includes(:job).load.tap do |claimed| block.call(claimed) payload[:size] = claimed.size @@ -99,7 +99,10 @@ def execute def finished transaction do - job.finished! + SolidQueue::Job + .with_execution + .find(job_id) + .finished! destroy! end end diff --git a/app/models/solid_queue/execution.rb b/app/models/solid_queue/execution.rb index 0974ca24..9b78df76 100644 --- a/app/models/solid_queue/execution.rb +++ b/app/models/solid_queue/execution.rb @@ -10,7 +10,7 @@ class UndiscardableError < StandardError; end scope :ordered, -> { order(priority: :asc, job_id: :asc) } - belongs_to :job, strict_loading: false + belongs_to :job class << self def type @@ -77,7 +77,10 @@ def type def discard SolidQueue.instrument(:discard, job_id: job_id, status: type) do with_lock do - job.destroy + SolidQueue::Job + .with_execution + .find(job_id) + .destroy destroy end end diff --git a/app/models/solid_queue/failed_execution.rb b/app/models/solid_queue/failed_execution.rb index 0b7fffe0..856bafa5 100644 --- a/app/models/solid_queue/failed_execution.rb +++ b/app/models/solid_queue/failed_execution.rb @@ -19,8 +19,9 @@ def self.retry_all(jobs) end def retry - SolidQueue.instrument(:retry, job_id: job.id) do + SolidQueue.instrument(:retry, job_id: job_id) do with_lock do + job = SolidQueue::Job.find(job_id) job.reset_execution_counters job.prepare_for_execution destroy! diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..02d154a9 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -4,7 +4,7 @@ module SolidQueue class Job < Record class EnqueueError < StandardError; end - include Executable, Clearable, Recurrable + include Executable, Clearable serialize :arguments, coder: JSON diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index f668e989..7f79a6e5 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -6,7 +6,7 @@ module ConcurrencyControls extend ActiveSupport::Concern included do - has_one :blocked_execution, strict_loading: false + has_one :blocked_execution delegate :concurrency_limit, :concurrency_duration, to: :job_class @@ -14,6 +14,10 @@ module ConcurrencyControls end class_methods do + def execution_associations + super.to_a.append(:blocked_execution) + end + def release_all_concurrency_locks(jobs) Semaphore.signal_all(jobs.select(&:concurrency_limited?)) end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 10fabd01..e40a93c2 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -6,17 +6,22 @@ module Executable extend ActiveSupport::Concern included do - include ConcurrencyControls, Schedulable, Retryable + include ConcurrencyControls, Schedulable, Retryable, Recurrable - has_one :ready_execution, strict_loading: false - has_one :claimed_execution, strict_loading: false + has_one :ready_execution + has_one :claimed_execution after_create :prepare_for_execution scope :finished, -> { where.not(finished_at: nil) } + scope :with_execution, -> { includes(execution_associations) } end class_methods do + def execution_associations + [:ready_execution, :claimed_execution] + end + def prepare_all_for_execution(jobs) due, not_yet_due = jobs.partition(&:due?) dispatch_all(due) + schedule_all(not_yet_due) diff --git a/app/models/solid_queue/job/recurrable.rb b/app/models/solid_queue/job/recurrable.rb index 5cc07140..96adcbfb 100644 --- a/app/models/solid_queue/job/recurrable.rb +++ b/app/models/solid_queue/job/recurrable.rb @@ -6,7 +6,13 @@ module Recurrable extend ActiveSupport::Concern included do - has_one :recurring_execution, strict_loading: false, dependent: :destroy + has_one :recurring_execution, dependent: :destroy + end + + class_methods do + def execution_associations + super.to_a.append(:recurring_execution) + end end end end diff --git a/app/models/solid_queue/job/retryable.rb b/app/models/solid_queue/job/retryable.rb index 59e45a82..c7493ded 100644 --- a/app/models/solid_queue/job/retryable.rb +++ b/app/models/solid_queue/job/retryable.rb @@ -6,11 +6,17 @@ module Retryable extend ActiveSupport::Concern included do - has_one :failed_execution, strict_loading: false + has_one :failed_execution scope :failed, -> { includes(:failed_execution).where.not(failed_execution: { id: nil }) } end + class_methods do + def execution_associations + super.to_a.append(:failed_execution) + end + end + def retry failed_execution&.retry end diff --git a/app/models/solid_queue/job/schedulable.rb b/app/models/solid_queue/job/schedulable.rb index d28e608e..41ecce29 100644 --- a/app/models/solid_queue/job/schedulable.rb +++ b/app/models/solid_queue/job/schedulable.rb @@ -6,12 +6,16 @@ module Schedulable extend ActiveSupport::Concern included do - has_one :scheduled_execution, strict_loading: false + has_one :scheduled_execution scope :scheduled, -> { where(finished_at: nil) } end class_methods do + def execution_associations + super.to_a.append(:scheduled_execution) + end + def schedule_all(jobs) schedule_all_at_once(jobs) successfully_scheduled(jobs) diff --git a/app/models/solid_queue/process.rb b/app/models/solid_queue/process.rb index 0ec47950..5dfce88a 100644 --- a/app/models/solid_queue/process.rb +++ b/app/models/solid_queue/process.rb @@ -3,8 +3,8 @@ class SolidQueue::Process < SolidQueue::Record include Executor, Prunable - belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees, strict_loading: false - has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, strict_loading: false + belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees + has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id store :metadata, coder: JSON @@ -32,7 +32,7 @@ def deregister(pruned: false) destroy! unless supervised? || pruned - supervisees.each(&:deregister) + SolidQueue::Process.where(supervisor_id: id).each(&:deregister) end rescue Exception => error payload[:error] = error diff --git a/test/dummy/config/application.rb b/test/dummy/config/application.rb index 3aa79b36..3d170afe 100644 --- a/test/dummy/config/application.rb +++ b/test/dummy/config/application.rb @@ -29,7 +29,7 @@ class Application < Rails::Application config.active_job.queue_adapter = :solid_queue - config.active_record.strict_loading_by_default = true + config.active_record.strict_loading_by_default = true unless ENV['SKIP_STRICT_LOADING'] if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite" config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } } diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index c90d161a..9305f806 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -48,7 +48,7 @@ class InstrumentationTest < ActiveSupport::TestCase worker.stop end - assert events.size >= 4 + assert events.size >= 3 #Test run faster when preloading, I had to drop this to 3 to reduce flaking events.each { |e| assert_event e, "polling" } end @@ -267,12 +267,12 @@ class InstrumentationTest < ActiveSupport::TestCase travel_to 3.days.from_now SolidQueue::Semaphore.expired.delete_all - blocked_jobs = SolidQueue::BlockedExecution.last(2).map(&:job) + blocked_jobs = SolidQueue::BlockedExecution.includes(:job).last(2).map(&:job) concurrency_key = blocked_jobs.first.concurrency_key events = subscribed("release_blocked.solid_queue") do - SolidQueue::BlockedExecution.release_one(concurrency_key) - SolidQueue::BlockedExecution.release_one(concurrency_key) + SolidQueue::BlockedExecution.includes(:job).release_one(concurrency_key) + SolidQueue::BlockedExecution.includes(:job).release_one(concurrency_key) end assert_equal 2, events.size @@ -294,8 +294,8 @@ class InstrumentationTest < ActiveSupport::TestCase SolidQueue::Semaphore.expired.delete_all events = subscribed("release_many_blocked.solid_queue") do - SolidQueue::BlockedExecution.unblock(5) - SolidQueue::BlockedExecution.unblock(5) + SolidQueue::BlockedExecution.includes(:job).unblock(5) + SolidQueue::BlockedExecution.includes(:job).unblock(5) end assert_equal 2, events.size diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index e1b713ee..af28571b 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -73,7 +73,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase assert_equal 2, SolidQueue::Job.finished.count # 2 retries of A assert_equal 1, SolidQueue::FailedExecution.count - failed_execution = SolidQueue::FailedExecution.last + failed_execution = SolidQueue::FailedExecution.includes(:job).last failed_execution.job.retry wait_for_jobs_to_finish_for(3.seconds) diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index aa48c12a..f234af73 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -15,7 +15,9 @@ class RecurringTasksTest < ActiveSupport::TestCase terminate_process(@pid) if process_exists?(@pid) SolidQueue::Process.destroy_all - SolidQueue::Job.destroy_all + SolidQueue::Job + .with_execution + .destroy_all SolidQueue::RecurringTask.delete_all JobResult.delete_all end diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 226dad77..3cfd83b3 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -78,7 +78,7 @@ def prepare_and_claim_job(active_job, process: @process) SolidQueue::ReadyExecution.claim(job.queue_name, 1, process.id) end - SolidQueue::ClaimedExecution.last + SolidQueue::ClaimedExecution.includes(:process, :job).last end def with_error_subscriber(subscriber) diff --git a/test/models/solid_queue/failed_execution_test.rb b/test/models/solid_queue/failed_execution_test.rb index 7b142991..bdda1e55 100644 --- a/test/models/solid_queue/failed_execution_test.rb +++ b/test/models/solid_queue/failed_execution_test.rb @@ -28,7 +28,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase assert_difference -> { SolidQueue::FailedExecution.count }, -1 do assert_difference -> { SolidQueue::ReadyExecution.count }, +1 do - SolidQueue::FailedExecution.last.retry + SolidQueue::FailedExecution.includes(:job).last.retry end end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index a9b3cc59..83ab9ff2 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -29,7 +29,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob SolidQueue::Job.enqueue(active_job) end - solid_queue_job = SolidQueue::Job.last + solid_queue_job = SolidQueue::Job.with_execution.last assert solid_queue_job.ready? assert_equal :ready, solid_queue_job.status assert_equal solid_queue_job.id, active_job.provider_job_id @@ -39,7 +39,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert Time.now >= solid_queue_job.scheduled_at assert_equal [ 1 ], solid_queue_job.arguments["arguments"] - execution = SolidQueue::ReadyExecution.last + execution = SolidQueue::ReadyExecution.includes(:job).last assert_equal solid_queue_job, execution.job assert_equal "test", execution.queue_name assert_equal 8, execution.priority @@ -52,7 +52,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob SolidQueue::Job.enqueue(active_job, scheduled_at: 5.minutes.from_now) end - solid_queue_job = SolidQueue::Job.last + solid_queue_job = SolidQueue::Job.with_execution.last assert solid_queue_job.scheduled? assert_equal :scheduled, solid_queue_job.status assert_equal 8, solid_queue_job.priority @@ -61,7 +61,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert Time.now < solid_queue_job.scheduled_at assert_equal [ 1 ], solid_queue_job.arguments["arguments"] - execution = SolidQueue::ScheduledExecution.last + execution = SolidQueue::ScheduledExecution.includes(:job).last assert_equal solid_queue_job, execution.job assert_equal "test", execution.queue_name assert_equal 8, execution.priority @@ -139,7 +139,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "discard ready job" do AddToBufferJob.perform_later(1) - job = SolidQueue::Job.last + job = SolidQueue::Job.with_execution.last assert_job_counts ready: -1 do job.discard @@ -149,7 +149,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "discard blocked job" do NonOverlappingJob.perform_later(@result, name: "ready") NonOverlappingJob.perform_later(@result, name: "blocked") - ready_job, blocked_job = SolidQueue::Job.last(2) + ready_job, blocked_job = SolidQueue::Job.with_execution.last(2) semaphore = SolidQueue::Semaphore.last travel_to 10.minutes.from_now @@ -163,10 +163,11 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "try to discard claimed job" do StoreResultJob.perform_later(42, pause: 2.seconds) - job = SolidQueue::Job.last worker = SolidQueue::Worker.new(queues: "background").tap(&:start) sleep(0.2) + #has to come after the sleep now since we preload the execution at the same time + job = SolidQueue::Job.with_execution.last assert_no_difference -> { SolidQueue::Job.count }, -> { SolidQueue::ClaimedExecution.count } do assert_raises SolidQueue::Execution::UndiscardableError do @@ -179,7 +180,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "discard scheduled job" do AddToBufferJob.set(wait: 5.minutes).perform_later - job = SolidQueue::Job.last + job = SolidQueue::Job.with_execution.last assert_job_counts scheduled: -1 do job.discard @@ -189,7 +190,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "release blocked locks when discarding a ready job" do NonOverlappingJob.perform_later(@result, name: "ready") NonOverlappingJob.perform_later(@result, name: "blocked") - ready_job, blocked_job = SolidQueue::Job.last(2) + ready_job, blocked_job = SolidQueue::Job.with_execution.last(2) semaphore = SolidQueue::Semaphore.last assert ready_job.ready? @@ -269,7 +270,7 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob private def assert_ready(&block) assert_job_counts(ready: 1, &block) - assert SolidQueue::Job.last.ready? + assert SolidQueue::Job.with_execution.last.ready? end def assert_scheduled(&block) @@ -278,7 +279,7 @@ def assert_scheduled(&block) def assert_blocked(&block) assert_job_counts(blocked: 1, &block) - assert SolidQueue::Job.last.blocked? + assert SolidQueue::Job.with_execution.last.blocked? end def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block) diff --git a/test/test_helper.rb b/test/test_helper.rb index 176cb6e1..34463253 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -36,7 +36,7 @@ class ActiveSupport::TestCase end unless self.class.use_transactional_tests - SolidQueue::Job.destroy_all + SolidQueue::Job.with_execution.destroy_all SolidQueue::Process.destroy_all SolidQueue::Semaphore.delete_all SolidQueue::RecurringTask.delete_all diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index d0833fcf..778b1d9c 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -4,7 +4,9 @@ module JobsTestHelper def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) wait_while_with_timeout(timeout) do skip_active_record_query_cache do - SolidQueue::Job.where.not(active_job_id: Array(except).map(&:job_id)).where(finished_at: nil).any? + SolidQueue::Job + .with_execution + .where.not(active_job_id: Array(except).map(&:job_id)).where(finished_at: nil).any? end end end diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index 729216bd..444a847c 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -18,7 +18,7 @@ def assert_no_registered_processes end def assert_registered_processes(kind:, count: 1, supervisor_pid: nil, **attributes) - processes = skip_active_record_query_cache { SolidQueue::Process.where(kind: kind).to_a } + processes = skip_active_record_query_cache { SolidQueue::Process.includes(:supervisor).where(kind: kind).to_a } assert_equal count, processes.count if supervisor_pid @@ -46,7 +46,7 @@ def assert_metadata(process, metadata) def find_processes_registered_as(kind) skip_active_record_query_cache do - SolidQueue::Process.where(kind: kind) + SolidQueue::Process.includes(:supervisor).where(kind: kind) end end From 3607ae8c3c9fe7fb9a909c4e49e5dd1a5bd6bce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A9=20Dupuis?= Date: Mon, 7 Oct 2024 01:11:46 -0700 Subject: [PATCH 2/2] Hack to bypass Active Record's association --- app/models/solid_queue/job/concurrency_controls.rb | 4 ++++ app/models/solid_queue/job/executable.rb | 8 ++++++++ app/models/solid_queue/job/recurrable.rb | 4 ++++ app/models/solid_queue/job/retryable.rb | 4 ++++ app/models/solid_queue/job/schedulable.rb | 4 ++++ 5 files changed, 24 insertions(+) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 7f79a6e5..eb0afd40 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -37,6 +37,10 @@ def blocked? blocked_execution.present? end + def blocked_execution + @blocked_execution ||= BlockedExecution.find_by(job_id: id) + end + private def acquire_concurrency_lock return true unless concurrency_limited? diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e40a93c2..d5245761 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -104,6 +104,14 @@ def discard execution&.discard end + %w[ ready claimed failed ].each do |status| + define_method("#{status}_execution") do + return instance_variable_get("@#{status}_execution") || + "SolidQueue::#{status.capitalize}Execution".safe_constantize.includes(:job).find_by(job_id: id) + .tap {|execution| instance_variable_set("@#{status}_execution", execution) } + end + end + private def ready ReadyExecution.create_or_find_by!(job_id: id) diff --git a/app/models/solid_queue/job/recurrable.rb b/app/models/solid_queue/job/recurrable.rb index 96adcbfb..eb561176 100644 --- a/app/models/solid_queue/job/recurrable.rb +++ b/app/models/solid_queue/job/recurrable.rb @@ -14,6 +14,10 @@ def execution_associations super.to_a.append(:recurring_execution) end end + + def recurring_execution + @recurring_execution ||= RecurringExecution.find_by(job_id: id) + end end end end diff --git a/app/models/solid_queue/job/retryable.rb b/app/models/solid_queue/job/retryable.rb index c7493ded..abf3219f 100644 --- a/app/models/solid_queue/job/retryable.rb +++ b/app/models/solid_queue/job/retryable.rb @@ -17,6 +17,10 @@ def execution_associations end end + def failed_execution + @failed_execution ||= SolidQueue::FailedExecution.find_by(job_id: id) + end + def retry failed_execution&.retry end diff --git a/app/models/solid_queue/job/schedulable.rb b/app/models/solid_queue/job/schedulable.rb index 41ecce29..e8e2ac3a 100644 --- a/app/models/solid_queue/job/schedulable.rb +++ b/app/models/solid_queue/job/schedulable.rb @@ -39,6 +39,10 @@ def scheduled? scheduled_execution.present? end + def scheduled_execution + @scheduled_execution ||= ScheduledExecution.find_by(job_id: id) + end + private def schedule ScheduledExecution.create_or_find_by!(job_id: id)