Skip to content

Commit

Permalink
[+] delayed_sidekiq strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
skcc321 committed Jan 26, 2023
1 parent 0ad19ab commit 6434366
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

* [#869](https://github.com/toptal/chewy/pull/869): New strategy - `delayed_sidekiq`. Allow passing `strategy: :delayed_sidekiq` option to `SomeIndex.import([1, ...], strategy: :delayed_sidekiq)`. The strategy is compatible with `update_fields` option as well. ([@skcc321][])

### Changes

### Bugs Fixed
Expand Down
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,45 @@ The default queue name is `chewy`, you can customize it in settings: `sidekiq.qu
Chewy.settings[:sidekiq] = {queue: :low}
```

#### `:delayed_sidekiq`

It accumulate ids of records to be reindexed during the latency window in redis and then does the reindex of all accumulated records at once.
The strategy is very useful in case of frequently mutated records.
It supports update_fields option, so it will try to select just enough data from the DB

There are three options that can be defined in the index:
```ruby
class CitiesIndex...
delayed_sidekiq_options latency: 3, margin: 2, reindex_wrapper: ->(&reindex) { ActiveRecord::Base.connected_to(role: :reading) { reindex.call } }

# latency - will prevent scheduling identical jobs
# margin - main purpose is to cover db replication lag by the margin
# reindex_wrapper - lambda that accepts block to wrap that reindex process AR connection block.
...
end
```

```ruby
Chewy.strategy(:delayed_sidekiq) do
City.popular.map(&:do_some_update_action!)
end
```

The default queue name is `chewy`, you can customize it in settings: `sidekiq.queue_name`
```
Chewy.settings[:sidekiq] = {queue: :low}
```

Explicit call of the reindex using :delayed_sidekiq strategy
```ruby
CitiesIndex.import([1, 2, 3], strategy: :delayed_sidekiq)
```

Explicit call of the reindex using :delayed_sidekiq strategy with :update_fields support
```ruby
CitiesIndex.import([1, 2, 3], update_fields: [:name], strategy: :delayed_sidekiq)
```

#### `:active_job`

This does the same thing as `:atomic`, but using ActiveJob. This will inherit the ActiveJob configuration settings including the `active_job.queue_adapter` setting for the environment. Patch `Chewy::Strategy::ActiveJob::Worker` for index updates improving.
Expand Down
1 change: 1 addition & 0 deletions chewy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength

spec.add_development_dependency 'database_cleaner'
spec.add_development_dependency 'elasticsearch-extensions'
spec.add_development_dependency 'mock_redis'
spec.add_development_dependency 'rake'
spec.add_development_dependency 'rspec', '>= 3.7.0'
spec.add_development_dependency 'rspec-collection_matchers'
Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'sidekiq'
require 'chewy/strategy/sidekiq'
require 'chewy/strategy/lazy_sidekiq'
require 'chewy/strategy/delayed_sidekiq'
rescue LoadError
nil
end
Expand Down
185 changes: 185 additions & 0 deletions lib/chewy/strategy/delayed_sidekiq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# frozen_string_literal: true

require_relative '../index'

# patch Chewy::Index with a delayed sidekiq options method
# example usage:
# class UsersIndex < Chewy::Index
# delayed_sidekiq_options latency: 10, margin: 2, reindex_wrapper: ->(&reindex) {
# ActiveRecord::Base.connected_to(role: :reading) do
# reindex.call
# end
# }
# ...
# end
Chewy::Index.define_singleton_method(:delayed_sidekiq_options) do |opts = {}|
@delayed_sidekiq_options ||= Struct.new(:latency, :margin, :reindex_wrapper, keyword_init: true).new(
latency: opts[:latency] || Chewy::Config.instance.configuration.dig(:delayed_sidekiq, :latency),
margin: opts[:margin] || Chewy::Config.instance.configuration.dig(:delayed_sidekiq, :margin),
reindex_wrapper: ->(&reindex) { reindex.call }
)
end

# patch Chewy::Index import method with :strategy option
# example usage:
# UsersIndex.import([user1.id], update_fields: [:email], strategy: :delayed_sidekiq)
Chewy::Index.define_singleton_method(:import_routine) do |*args|
*ids, options = args
if options.is_a?(Hash) && options.delete(:strategy) == :delayed_sidekiq
return if ids.empty?

return 'delayed_sidekiq supports ids only!' unless ids.all? do |id|
id.respond_to?(:to_i)
end

begin
Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone
rescue StandardError => e
e.message
else
return # to match super behaviour - return nothing (means no errors)
end
else
super(*args)
end
end

module Chewy
class Strategy
class DelayedSidekiq < Atomic
DEFAULT_QUEUE = 'chewy'
DEFAULT_LATENCY = 10
DEFAULT_MARGIN = 2
KEY_PREFIX = 'chewy:delayed_sidekiq'
FALLBACK_FIELDS = 'all'
FIELDS_IDS_SEPARATOR = ';'
IDS_SEPARATOR = ','

class Worker
include ::Sidekiq::Worker

def perform(type, score, options = {})
options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async

::Sidekiq.redis do |redis|
timechunks_key = "#{KEY_PREFIX}:#{type}:timechunks"
timechunk_keys = redis.zrangebyscore(timechunks_key, -1, score)
members = timechunk_keys.flat_map { |timechunk_key| redis.smembers(timechunk_key) }

# extract ids and fields & do the reset of records
ids, fields = extract_ids_and_fields(members)
options[:update_fields] = fields if fields

index = type.constantize
index.delayed_sidekiq_options.reindex_wrapper.call do
options.any? ? index.import!(ids, **options) : index.import!(ids)
end

redis.del(timechunk_keys)
redis.zremrangebyscore(timechunks_key, -1, score)
end
end

private

def extract_ids_and_fields(members)
ids = []
fields = []

members.each do |member|
member_ids, member_fields = member.split(FIELDS_IDS_SEPARATOR).map { |v| v.split(IDS_SEPARATOR) }
ids |= member_ids
fields |= member_fields
end

fields = nil if fields.include?(FALLBACK_FIELDS)

[ids.map(&:to_i), fields]
end
end

class Scheduler
def initialize(type, ids, options = {})
@type = type
@ids = ids
@options = options
end

def postpone
::Sidekiq.redis do |redis|
redis.sadd(timechunk_key, serialize_data)
redis.expire(timechunk_key, expire_in)

unless redis.zrank(timechunks_key, timechunk_key)
redis.zadd(timechunks_key, at, timechunk_key)
redis.expire(timechunks_key, expire_in)

::Sidekiq::Client.push(
'queue' => sidekiq_queue,
'at' => at + margin,
'class' => Chewy::Strategy::DelayedSidekiq::Worker,
'args' => [type_name, at]
)
end
end
end

private

attr_reader :type, :ids, :options

def expire_in
latency * 10 # avoid redis growing in case of dead worker
end

def at
@at ||= begin
schedule_at = latency.seconds.from_now.to_f

(schedule_at - (schedule_at % latency)).to_i
end
end

def fields
options[:update_fields] || [FALLBACK_FIELDS]
end

def timechunks_key
"#{KEY_PREFIX}:#{type_name}:timechunks"
end

def timechunk_key
"#{KEY_PREFIX}:#{type_name}:#{at}"
end

def serialize_data
[ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR)
end

def type_name
type.name
end

def latency
type.delayed_sidekiq_options.latency || Chewy::Strategy::DelayedSidekiq::DEFAULT_LATENCY
end

def margin
type.delayed_sidekiq_options.margin || Chewy::Strategy::DelayedSidekiq::DEFAULT_MARGIN
end

def sidekiq_queue
Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE
end
end

def leave
@stash.each do |type, ids|
next if ids.empty?

DelayedSidekiq::Scheduler.new(type, ids).postpone
end
end
end
end
end
Loading

0 comments on commit 6434366

Please sign in to comment.