diff --git a/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb b/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb index d25e92c2..f41fe47b 100644 --- a/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb +++ b/lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb @@ -11,9 +11,11 @@ module ActiveJob # @example # class MyJob < BawWorkers::Jobs::ApplicationJob # queue_as :default - # recurring_at '0 0 * * *' # every day at midnight - # def perform(*args) - # # do something + # recurring_at '0 0 * * *', args: [ 1 ] + # + # def perform(an_argument) + # # prints `1` every day at midnight + # puts an_argument # end # end # @@ -32,14 +34,20 @@ module ClassMethods # @!attribute [rw] recurring_cron_schedule # @return [string] The cron schedule for this recurring job. + # @!attribute [rw] recurring_cron_schedule_args + # @return [Array] The arguments the job will be scheduled with + # Sets the cron schedule for this job. # @param [String] cron_schedule the cron schedule for this job. This is a 6-star schedule. + # @param [Array] args the arguments the job will be scheduled with # @raise [ArgumentError] if cron_schedule is not a string # @return [void] - def recurring_at(cron_schedule) + def recurring_at(cron_schedule, args: []) raise ArgumentError, 'cron_schedule must be a string' unless cron_schedule.is_a?(String) + raise ArgumentError, 'args must be an array' unless args.is_a?(Array) self.recurring_cron_schedule = cron_schedule + self.recurring_cron_schedule_args = args end # override resque-scheduler's default behaviour to adapt it to work with active job. @@ -54,6 +62,7 @@ def scheduled(_queue, _klass, *args) def __setup_recurring class_attribute :recurring_cron_schedule, instance_accessor: false + class_attribute :recurring_cron_schedule_args, instance_accessor: false end end diff --git a/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb b/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb index bc131b49..77e23f08 100644 --- a/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb +++ b/lib/gems/baw-workers/lib/baw_workers/jobs/analysis/remote_stale_check_job.rb @@ -13,7 +13,7 @@ class RemoteStaleCheckJob < BawWorkers::Jobs::ApplicationJob queue_as Settings.actions.analysis_stale_check.queue perform_expects [NilClass, Integer] - recurring_at Settings.actions.analysis_stale_check.schedule + recurring_at Settings.actions.analysis_stale_check.schedule, args: [nil] # only allow one of these to run at once. # constrained resource: don't want two of these running at once otherwise @@ -29,7 +29,7 @@ class RemoteStaleCheckJob < BawWorkers::Jobs::ApplicationJob push_message(error.message) end - def perform(min_age_seconds = nil) + def perform(min_age_seconds) # first check if we can contact the remote queue failed!('Could not connect to remote queue.') unless batch.remote_connected? diff --git a/lib/gems/baw-workers/lib/baw_workers/resque_api.rb b/lib/gems/baw-workers/lib/baw_workers/resque_api.rb index b4e6e2e0..1995e4df 100644 --- a/lib/gems/baw-workers/lib/baw_workers/resque_api.rb +++ b/lib/gems/baw-workers/lib/baw_workers/resque_api.rb @@ -42,7 +42,8 @@ def create_all_schedules BawWorkers::Config.logger_worker.info( 'rake_task:baw:worker:run_scheduler adding recurring job', job_class: job_class.name, - schedule: job_class.recurring_cron_schedule + schedule: job_class.recurring_cron_schedule, + args: job_class.recurring_cron_schedule_args ) # add the job to the resque scheduler @@ -51,7 +52,8 @@ def create_all_schedules { class: job_class.name, cron: job_class.recurring_cron_schedule, - queue: job_class.queue_name + queue: job_class.queue_name, + args: job_class.recurring_cron_schedule_args # We don't persist the schedule because we set it every time # we start up the scheduler (this very process). #persist: @@ -70,10 +72,9 @@ def clear_all_schedules # Get all currently queued jobs. # @return [Array] def jobs_queued - jobs = [] - Resque.queues.each do |queue| - jobs.push(jobs_queued_in(queue)) - end + jobs = Resque.queues.map { |queue| + jobs_queued_in(queue) + } jobs.flatten end @@ -258,7 +259,7 @@ def queues_being_worked_on def queue_names(env = BawApp.env) env_regex = Regexp.new(env) - Resque.queues.filter { |queue| queue =~ env_regex } + Resque.queues.grep(env_regex) end def clear_queues(env = BawApp.env) diff --git a/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb b/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb index 07933897..80e34ef9 100644 --- a/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb +++ b/spec/lib/gems/baw_workers/jobs/analysis/remote_stale_check_job_spec.rb @@ -13,7 +13,6 @@ ) pause_all_jobs - submit_pbs_jobs_as_held def get_last_status(expected_count) @@ -23,6 +22,16 @@ def get_last_status(expected_count) ).max_by(&:time) end + it 'can be performed later' do + BawWorkers::Jobs::Analysis::RemoteStaleCheckJob.perform_later!(nil) + + expect_enqueued_jobs(1, of_class: BawWorkers::Jobs::Analysis::RemoteStaleCheckJob) + + perform_jobs(count: 1) + + expect_performed_jobs(1, of_class: BawWorkers::Jobs::Analysis::RemoteStaleCheckJob) + end + stepwise 'can resolve stale jobs' do after do Timecop.return