diff --git a/CHANGELOG.md b/CHANGELOG.md index 0823534ba..2da701b2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +* [#869](https://github.com/toptal/chewy/pull/869): New strategy - `delayed_sidekiq`. Allow passing `strategy: :delayed_sidekiq` option to `SomeIndex.import([1, ...], strategy: :delayed_sidekiq)`. The strategy is compatible with `update_fields` option as well. ([@skcc321][]) + ### Changes ### Bugs Fixed diff --git a/README.md b/README.md index a84d0c4df..eb672bb66 100644 --- a/README.md +++ b/README.md @@ -774,6 +774,45 @@ The default queue name is `chewy`, you can customize it in settings: `sidekiq.qu Chewy.settings[:sidekiq] = {queue: :low} ``` +#### `:delayed_sidekiq` + +It accumulate ids of records to be reindexed during the latency window in redis and then does the reindex of all accumulated records at once. +The strategy is very useful in case of frequently mutated records. +It supports update_fields option, so it will try to select just enough data from the DB + +There are three options that can be defined in the index: +```ruby +class CitiesIndex... + delayed_sidekiq_options latency: 3, margin: 2, reindex_wrapper: ->(&reindex) { ActiveRecord::Base.connected_to(role: :reading) { reindex.call } } + + # latency - will prevent scheduling identical jobs + # margin - main purpose is to cover db replication lag by the margin + # reindex_wrapper - lambda that accepts block to wrap that reindex process AR connection block. + ... +end +``` + +```ruby +Chewy.strategy(:delayed_sidekiq) do + City.popular.map(&:do_some_update_action!) +end +``` + +The default queue name is `chewy`, you can customize it in settings: `sidekiq.queue_name` +``` +Chewy.settings[:sidekiq] = {queue: :low} +``` + +Explicit call of the reindex using :delayed_sidekiq strategy +```ruby +CitiesIndex.import([1, 2, 3], strategy: :delayed_sidekiq) +``` + +Explicit call of the reindex using :delayed_sidekiq strategy with :update_fields support +```ruby +CitiesIndex.import([1, 2, 3], update_fields: [:name], strategy: :delayed_sidekiq) +``` + #### `:active_job` This does the same thing as `:atomic`, but using ActiveJob. This will inherit the ActiveJob configuration settings including the `active_job.queue_adapter` setting for the environment. Patch `Chewy::Strategy::ActiveJob::Worker` for index updates improving. diff --git a/chewy.gemspec b/chewy.gemspec index e85d12b84..0e542ab56 100644 --- a/chewy.gemspec +++ b/chewy.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength spec.add_development_dependency 'database_cleaner' spec.add_development_dependency 'elasticsearch-extensions' + spec.add_development_dependency 'mock_redis' spec.add_development_dependency 'rake' spec.add_development_dependency 'rspec', '>= 3.7.0' spec.add_development_dependency 'rspec-collection_matchers' diff --git a/lib/chewy/strategy.rb b/lib/chewy/strategy.rb index 83baadb24..a8c6c5dfa 100644 --- a/lib/chewy/strategy.rb +++ b/lib/chewy/strategy.rb @@ -8,6 +8,7 @@ require 'sidekiq' require 'chewy/strategy/sidekiq' require 'chewy/strategy/lazy_sidekiq' + require 'chewy/strategy/delayed_sidekiq' rescue LoadError nil end diff --git a/lib/chewy/strategy/delayed_sidekiq.rb b/lib/chewy/strategy/delayed_sidekiq.rb new file mode 100644 index 000000000..d3239fcca --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq.rb @@ -0,0 +1,185 @@ +# frozen_string_literal: true + +require_relative '../index' + +# patch Chewy::Index with a delayed sidekiq options method +# example usage: +# class UsersIndex < Chewy::Index +# delayed_sidekiq_options latency: 10, margin: 2, reindex_wrapper: ->(&reindex) { +# ActiveRecord::Base.connected_to(role: :reading) do +# reindex.call +# end +# } +# ... +# end +Chewy::Index.define_singleton_method(:delayed_sidekiq_options) do |opts = {}| + @delayed_sidekiq_options ||= Struct.new(:latency, :margin, :reindex_wrapper, keyword_init: true).new( + latency: opts[:latency] || Chewy::Config.instance.configuration.dig(:delayed_sidekiq, :latency), + margin: opts[:margin] || Chewy::Config.instance.configuration.dig(:delayed_sidekiq, :margin), + reindex_wrapper: ->(&reindex) { reindex.call } + ) +end + +# patch Chewy::Index import method with :strategy option +# example usage: +# UsersIndex.import([user1.id], update_fields: [:email], strategy: :delayed_sidekiq) +Chewy::Index.define_singleton_method(:import_routine) do |*args| + *ids, options = args + if options.is_a?(Hash) && options.delete(:strategy) == :delayed_sidekiq + return if ids.empty? + + return 'delayed_sidekiq supports ids only!' unless ids.all? do |id| + id.respond_to?(:to_i) + end + + begin + Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone + rescue StandardError => e + e.message + else + return # to match super behaviour - return nothing (means no errors) + end + else + super(*args) + end +end + +module Chewy + class Strategy + class DelayedSidekiq < Atomic + DEFAULT_QUEUE = 'chewy' + DEFAULT_LATENCY = 10 + DEFAULT_MARGIN = 2 + KEY_PREFIX = 'chewy:delayed_sidekiq' + FALLBACK_FIELDS = 'all' + FIELDS_IDS_SEPARATOR = ';' + IDS_SEPARATOR = ',' + + class Worker + include ::Sidekiq::Worker + + def perform(type, score, options = {}) + options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async + + ::Sidekiq.redis do |redis| + timechunks_key = "#{KEY_PREFIX}:#{type}:timechunks" + timechunk_keys = redis.zrangebyscore(timechunks_key, -1, score) + members = timechunk_keys.flat_map { |timechunk_key| redis.smembers(timechunk_key) } + + # extract ids and fields & do the reset of records + ids, fields = extract_ids_and_fields(members) + options[:update_fields] = fields if fields + + index = type.constantize + index.delayed_sidekiq_options.reindex_wrapper.call do + options.any? ? index.import!(ids, **options) : index.import!(ids) + end + + redis.del(timechunk_keys) + redis.zremrangebyscore(timechunks_key, -1, score) + end + end + + private + + def extract_ids_and_fields(members) + ids = [] + fields = [] + + members.each do |member| + member_ids, member_fields = member.split(FIELDS_IDS_SEPARATOR).map { |v| v.split(IDS_SEPARATOR) } + ids |= member_ids + fields |= member_fields + end + + fields = nil if fields.include?(FALLBACK_FIELDS) + + [ids.map(&:to_i), fields] + end + end + + class Scheduler + def initialize(type, ids, options = {}) + @type = type + @ids = ids + @options = options + end + + def postpone + ::Sidekiq.redis do |redis| + redis.sadd(timechunk_key, serialize_data) + redis.expire(timechunk_key, expire_in) + + unless redis.zrank(timechunks_key, timechunk_key) + redis.zadd(timechunks_key, at, timechunk_key) + redis.expire(timechunks_key, expire_in) + + ::Sidekiq::Client.push( + 'queue' => sidekiq_queue, + 'at' => at + margin, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => [type_name, at] + ) + end + end + end + + private + + attr_reader :type, :ids, :options + + def expire_in + latency * 10 # avoid redis growing in case of dead worker + end + + def at + @at ||= begin + schedule_at = latency.seconds.from_now.to_f + + (schedule_at - (schedule_at % latency)).to_i + end + end + + def fields + options[:update_fields] || [FALLBACK_FIELDS] + end + + def timechunks_key + "#{KEY_PREFIX}:#{type_name}:timechunks" + end + + def timechunk_key + "#{KEY_PREFIX}:#{type_name}:#{at}" + end + + def serialize_data + [ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR) + end + + def type_name + type.name + end + + def latency + type.delayed_sidekiq_options.latency || Chewy::Strategy::DelayedSidekiq::DEFAULT_LATENCY + end + + def margin + type.delayed_sidekiq_options.margin || Chewy::Strategy::DelayedSidekiq::DEFAULT_MARGIN + end + + def sidekiq_queue + Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE + end + end + + def leave + @stash.each do |type, ids| + next if ids.empty? + + DelayedSidekiq::Scheduler.new(type, ids).postpone + end + end + end + end +end diff --git a/spec/chewy/strategy/delayed_sidekiq_spec.rb b/spec/chewy/strategy/delayed_sidekiq_spec.rb new file mode 100644 index 000000000..adb205e7e --- /dev/null +++ b/spec/chewy/strategy/delayed_sidekiq_spec.rb @@ -0,0 +1,167 @@ +require 'spec_helper' + +if defined?(Sidekiq) + require 'sidekiq/testing' + require 'mock_redis' + + describe Chewy::Strategy::DelayedSidekiq do + around do |example| + Chewy.strategy(:bypass) { example.run } + end + + before do + redis = MockRedis.new + allow(Sidekiq).to receive(:redis).and_yield(redis) + Sidekiq::Worker.clear_all + end + + before do + stub_model(:city) do + update_index('cities') { self } + end + + stub_index(:cities) do + index_scope City + end + end + + let(:city) { City.create!(name: 'hello') } + let(:other_city) { City.create!(name: 'world') } + + specify do + expect { [city, other_city].map(&:save!) } + .not_to update_index(CitiesIndex, strategy: :delayed_sidekiq) + end + + specify do + expect(Sidekiq::Client).to receive(:push).with( + hash_including( + 'queue' => 'chewy', + 'at' => an_instance_of(Integer), + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => ['CitiesIndex', an_instance_of(Integer)] + ) + ).and_call_original + Sidekiq::Testing.inline! do + expect { [city, other_city].map(&:save!) } + .to update_index(CitiesIndex, strategy: :delayed_sidekiq) + .and_reindex(city, other_city).only + end + end + + specify do + CitiesIndex.delayed_sidekiq_options({reindex_wrapper: ->(&reindex) { reindex.call }, margin: 1, latency: 3}) + expect(Sidekiq::Client).to receive(:push).with( + hash_including( + 'queue' => 'chewy', + 'at' => an_instance_of(Integer), + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => ['CitiesIndex', an_instance_of(Integer)] + ) + ).and_call_original + + Sidekiq::Testing.inline! do + expect { [city, other_city].map(&:save!) } + .to update_index(CitiesIndex, strategy: :delayed_sidekiq) + .and_reindex(city, other_city).only + end + end + + specify do + allow(Chewy).to receive(:disable_refresh_async).and_return(true) + expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], refresh: false) + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id, other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id], update_fields: %w[description name]).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id]).once + expect(CitiesIndex).to receive(:import!).with([other_city.id]).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id]).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id], update_fields: ['name']).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['name']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + specify do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id], update_fields: ['description']).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end +end