From 25534f5636779ad28e470d02e024d91a59d6d336 Mon Sep 17 00:00:00 2001 From: Anthony Truskinger Date: Tue, 14 Jan 2025 04:58:36 +0000 Subject: [PATCH] Fix stale job scheduling Our remote stale job was not enqueuing correctly from the scheduler. This is because we had no way to supply arguments to scheduled jobs and my own silly job argument validator could not handle that. Now we can supply arguments --- .../lib/baw_workers/active_job/recurring.rb | 17 +++++++++++++---- .../jobs/analysis/remote_stale_check_job.rb | 4 ++-- .../baw-workers/lib/baw_workers/resque_api.rb | 15 ++++++++------- .../analysis/remote_stale_check_job_spec.rb | 11 ++++++++++- 4 files changed, 33 insertions(+), 14 deletions(-) diff --git a/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb b/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb index d25e92c2..f41fe47b 100644 --- a/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb +++ b/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb @@ -11,9 +11,11 @@ module ActiveJob # @example # class MyJob < BawWorkers::Jobs::ApplicationJob # queue_as :default - # recurring_at '0 0 * * *' # every day at midnight - # def perform(*args) - # # do something + # recurring_at '0 0 * * *', args: [ 1 ] + # + # def perform(an_argument) + # # prints `1` every day at midnight + # puts an_argument # end # end # @@ -32,14 +34,20 @@ module ClassMethods # @!attribute [rw] recurring_cron_schedule # @return [string] The cron schedule for this recurring job. + # @!attribute [rw] recurring_cron_schedule_args + # @return [Array] The arguments the job will be scheduled with + # Sets the cron schedule for this job. # @param [String] cron_schedule the cron schedule for this job. This is a 6-star schedule. + # @param [Array] args the arguments the job will be scheduled with # @raise [ArgumentError] if cron_schedule is not a string # @return [void] - def recurring_at(cron_schedule) + def recurring_at(cron_schedule, args: []) raise ArgumentError, 'cron_schedule must be a string' unless cron_schedule.is_a?(String) + raise ArgumentError, 'args must be an array' unless args.is_a?(Array) self.recurring_cron_schedule = cron_schedule + self.recurring_cron_schedule_args = args end # override resque-scheduler's default behaviour to adapt it to work with active job. @@ -54,6 +62,7 @@ def scheduled(_queue, _klass, *args) def __setup_recurring class_attribute :recurring_cron_schedule, instance_accessor: false + class_attribute :recurring_cron_schedule_args, instance_accessor: false end end diff --git a/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb b/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb index bc131b49..77e23f08 100644 --- a/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb +++ b/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb @@ -13,7 +13,7 @@ class RemoteStaleCheckJob < BawWorkers::Jobs::ApplicationJob queue_as Settings.actions.analysis_stale_check.queue perform_expects [NilClass, Integer] - recurring_at Settings.actions.analysis_stale_check.schedule + recurring_at Settings.actions.analysis_stale_check.schedule, args: [nil] # only allow one of these to run at once. # constrained resource: don't want two of these running at once otherwise @@ -29,7 +29,7 @@ class RemoteStaleCheckJob < BawWorkers::Jobs::ApplicationJob push_message(error.message) end - def perform(min_age_seconds = nil) + def perform(min_age_seconds) # first check if we can contact the remote queue failed!('Could not connect to remote queue.') unless batch.remote_connected? diff --git a/lib/gems/baw-workers/lib/baw_workers/resque_api.rb b/lib/gems/baw-workers/lib/baw_workers/resque_api.rb index b4e6e2e0..1995e4df 100644 --- a/lib/gems/baw-workers/lib/baw_workers/resque_api.rb +++ b/lib/gems/baw-workers/lib/baw_workers/resque_api.rb @@ -42,7 +42,8 @@ def create_all_schedules BawWorkers::Config.logger_worker.info( 'rake_task:baw:worker:run_scheduler adding recurring job', job_class: job_class.name, - schedule: job_class.recurring_cron_schedule + schedule: job_class.recurring_cron_schedule, + args: job_class.recurring_cron_schedule_args ) # add the job to the resque scheduler @@ -51,7 +52,8 @@ def create_all_schedules { class: job_class.name, cron: job_class.recurring_cron_schedule, - queue: job_class.queue_name + queue: job_class.queue_name, + args: job_class.recurring_cron_schedule_args # We don't persist the schedule because we set it every time # we start up the scheduler (this very process). #persist: @@ -70,10 +72,9 @@ def clear_all_schedules # Get all currently queued jobs. # @return [Array] def jobs_queued - jobs = [] - Resque.queues.each do |queue| - jobs.push(jobs_queued_in(queue)) - end + jobs = Resque.queues.map { |queue| + jobs_queued_in(queue) + } jobs.flatten end @@ -258,7 +259,7 @@ def queues_being_worked_on def queue_names(env = BawApp.env) env_regex = Regexp.new(env) - Resque.queues.filter { |queue| queue =~ env_regex } + Resque.queues.grep(env_regex) end def clear_queues(env = BawApp.env) diff --git a/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb b/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb index 07933897..80e34ef9 100644 --- a/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb +++ b/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb @@ -13,7 +13,6 @@ ) pause_all_jobs - submit_pbs_jobs_as_held def get_last_status(expected_count) @@ -23,6 +22,16 @@ def get_last_status(expected_count) ).max_by(&:time) end + it 'can be performed later' do + BawWorkers::Jobs::Analysis::RemoteStaleCheckJob.perform_later!(nil) + + expect_enqueued_jobs(1, of_class: BawWorkers::Jobs::Analysis::RemoteStaleCheckJob) + + perform_jobs(count: 1) + + expect_performed_jobs(1, of_class: BawWorkers::Jobs::Analysis::RemoteStaleCheckJob) + end + stepwise 'can resolve stale jobs' do after do Timecop.return