Skip to content

Commit ac22818

Browse files
author
Felipe
committed
feat/Filter by date
1 parent bb63fab commit ac22818

File tree

2 files changed

+41
-6
lines changed

2 files changed

+41
-6
lines changed

README.md

+8
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ ActiveJob.jobs.finished.where(job_class_name: "SomeJob")
229229
# For adapters that support filtering by worker:
230230
# All jobs in progress being run by a given worker
231231
ActiveJob.jobs.in_progress.where(worker_id: 42)
232+
233+
# Using date filters
234+
# You can filter by: enqueued_at, scheduled_at or finished_at
235+
ActiveJob.jobs.pending.where(enqueued_at: 2.days.ago)
236+
ActiveJob.jobs.pending.where(scheduled_at: Date.today)
237+
238+
date_range = (Time.parse("2024-11-01")..Time.parse("2024-12-01"))
239+
ActiveJob.jobs.finished.where(finished_at: date_range)
232240
```
233241

234242
Some examples of bulk operations:

lib/active_job/jobs_relation.rb

+33-6
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ class ActiveJob::JobsRelation
2323
include Enumerable
2424

2525
STATUSES = %i[ pending failed in_progress blocked scheduled finished ]
26-
FILTERS = %i[ queue_name job_class_name ]
26+
FILTERS = %i[ queue_name job_class_name finished_at scheduled_at enqueued_at ]
2727

28-
PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id ]
28+
PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at scheduled_at enqueued_at ]
2929
attr_reader *PROPERTIES, :default_page_size
3030

3131
delegate :last, :[], :reverse, to: :to_a
@@ -51,12 +51,15 @@ def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size:
5151
# * <tt>:queue_name</tt> - To only include the jobs in the provided queue.
5252
# * <tt>:worker_id</tt> - To only include the jobs processed by the provided worker.
5353
# * <tt>:recurring_task_id</tt> - To only include the jobs corresponding to runs of a recurring task.
54-
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil)
54+
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil, scheduled_at: nil, enqueued_at: nil)
5555
# Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses
5656
arguments = { job_class_name: job_class_name,
5757
queue_name: queue_name,
5858
worker_id: worker_id,
59-
recurring_task_id: recurring_task_id
59+
recurring_task_id: recurring_task_id,
60+
finished_at: finished_at,
61+
scheduled_at: scheduled_at,
62+
enqueued_at: enqueued_at
6063
}.compact.collect { |key, value| [ key, value.to_s ] }.to_h
6164

6265
clone_with **arguments
@@ -259,13 +262,37 @@ def loaded?
259262
!@loaded_jobs.nil?
260263
end
261264

262-
# Filtering for not natively supported filters is performed in memory
263265
def filter(jobs)
264266
jobs.filter { |job| satisfy_filter?(job) }
265267
end
266268

269+
def satisfy_date_filter?(filter_value, job_value)
270+
return false if job_value.nil?
271+
272+
# Treat date ranges
273+
if filter_value.include?("..")
274+
start_date, end_date = filter_value.split("..").map { |date| Time.zone.parse(date) }
275+
filter_range = (start_date..end_date)
276+
return filter_range.cover?(job_value)
277+
end
278+
279+
filter = Time.zone.parse(filter_value)
280+
job_value >= filter
281+
end
282+
267283
def satisfy_filter?(job)
268-
filters.all? { |property| public_send(property) == job.public_send(property) }
284+
filters.all? do |property|
285+
filter_value = public_send(property)
286+
job_value = job.public_send(property)
287+
288+
return satisfy_date_filter?(filter_value, job_value) if is_date_filter?(property)
289+
290+
filter_value == job_value
291+
end
292+
end
293+
294+
def is_date_filter?(property)
295+
[ :finished_at, :scheduled_at, :enqueued_at ].include?(property)
269296
end
270297

271298
def filters

0 commit comments

Comments
 (0)