From 8ea2cfe5694d05ed34c32918f3465defcb9b73ae Mon Sep 17 00:00:00 2001 From: Viktor Date: Fri, 31 Mar 2023 18:47:24 +0300 Subject: [PATCH] [feature] delayed_sidekiq strategy (#869) --- CHANGELOG.md | 2 + README.md | 74 +++++++ chewy.gemspec | 1 + lib/chewy/index.rb | 25 +++ lib/chewy/index/import.rb | 31 ++- lib/chewy/strategy.rb | 1 + lib/chewy/strategy/delayed_sidekiq.rb | 17 ++ .../strategy/delayed_sidekiq/scheduler.rb | 148 ++++++++++++++ lib/chewy/strategy/delayed_sidekiq/worker.rb | 52 +++++ spec/chewy/strategy/delayed_sidekiq_spec.rb | 190 ++++++++++++++++++ 10 files changed, 539 insertions(+), 2 deletions(-) create mode 100644 lib/chewy/strategy/delayed_sidekiq.rb create mode 100644 lib/chewy/strategy/delayed_sidekiq/scheduler.rb create mode 100644 lib/chewy/strategy/delayed_sidekiq/worker.rb create mode 100644 spec/chewy/strategy/delayed_sidekiq_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 0823534ba..2da701b2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +* [#869](https://github.com/toptal/chewy/pull/869): New strategy - `delayed_sidekiq`. Allow passing `strategy: :delayed_sidekiq` option to `SomeIndex.import([1, ...], strategy: :delayed_sidekiq)`. The strategy is compatible with `update_fields` option as well. ([@skcc321][]) + ### Changes ### Bugs Fixed diff --git a/README.md b/README.md index 3ac60cd06..149e8ae54 100644 --- a/README.md +++ b/README.md @@ -774,6 +774,80 @@ The default queue name is `chewy`, you can customize it in settings: `sidekiq.qu Chewy.settings[:sidekiq] = {queue: :low} ``` +#### `:delayed_sidekiq` + +It accumulates ids of records to be reindexed during the latency window in redis and then does the reindexing of all accumulated records at once. +The strategy is very useful in case of frequently mutated records. +It supports `update_fields` option, so it will try to select just enough data from the DB + +There are three options that can be defined in the index: +```ruby +class CitiesIndex... + strategy_config delayed_sidekiq: { + latency: 3, + margin: 2, + ttl: 60 * 60 * 24, + reindex_wrapper: ->(&reindex) { + ActiveRecord::Base.connected_to(role: :reading) { reindex.call } + } + # latency - will prevent scheduling identical jobs + # margin - main purpose is to cover db replication lag by the margin + # ttl - a chunk expiration time (in seconds) + # reindex_wrapper - lambda that accepts block to wrap that reindex process AR connection block. + } + + ... +end +``` + +Also you can define defaults in the `initializers/chewy.rb` +```ruby +Chewy.settings = { + strategy_config: { + delayed_sidekiq: { + latency: 3, + margin: 2, + ttl: 60 * 60 * 24, + reindex_wrapper: ->(&reindex) { + ActiveRecord::Base.connected_to(role: :reading) { reindex.call } + } + } + } +} + +``` +or in `config/chewy.yml` +```ruby + strategy_config: + delayed_sidekiq: + latency: 3 + margin: 2 + ttl: <%= 60 * 60 * 24 %> + # reindex_wrapper setting is not possible here!!! use the initializer instead +``` + +You can use the strategy identically to other strategies +```ruby +Chewy.strategy(:delayed_sidekiq) do + City.popular.map(&:do_some_update_action!) +end +``` + +The default queue name is `chewy`, you can customize it in settings: `sidekiq.queue_name` +``` +Chewy.settings[:sidekiq] = {queue: :low} +``` + +Explicit call of the reindex using `:delayed_sidekiq strategy` +```ruby +CitiesIndex.import([1, 2, 3], strategy: :delayed_sidekiq) +``` + +Explicit call of the reindex using `:delayed_sidekiq` strategy with `:update_fields` support +```ruby +CitiesIndex.import([1, 2, 3], update_fields: [:name], strategy: :delayed_sidekiq) +``` + #### `:active_job` This does the same thing as `:atomic`, but using ActiveJob. This will inherit the ActiveJob configuration settings including the `active_job.queue_adapter` setting for the environment. Patch `Chewy::Strategy::ActiveJob::Worker` for index updates improving. diff --git a/chewy.gemspec b/chewy.gemspec index e85d12b84..0e542ab56 100644 --- a/chewy.gemspec +++ b/chewy.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength spec.add_development_dependency 'database_cleaner' spec.add_development_dependency 'elasticsearch-extensions' + spec.add_development_dependency 'mock_redis' spec.add_development_dependency 'rake' spec.add_development_dependency 'rspec', '>= 3.7.0' spec.add_development_dependency 'rspec-collection_matchers' diff --git a/lib/chewy/index.rb b/lib/chewy/index.rb index cdb3fa055..4c63cd8e4 100644 --- a/lib/chewy/index.rb +++ b/lib/chewy/index.rb @@ -20,6 +20,10 @@ class Index pipeline raw_import refresh replication ].freeze + STRATEGY_OPTIONS = { + delayed_sidekiq: %i[latency margin ttl reindex_wrapper] + }.freeze + include Search include Actions include Aliases @@ -221,6 +225,27 @@ def default_import_options(params) params.assert_valid_keys(IMPORT_OPTIONS_KEYS) self._default_import_options = _default_import_options.merge(params) end + + def strategy_config(params = {}) + @strategy_config ||= begin + config_struct = Struct.new(*STRATEGY_OPTIONS.keys).new + + STRATEGY_OPTIONS.each_with_object(config_struct) do |(strategy, options), res| + res[strategy] = case strategy + when :delayed_sidekiq + Struct.new(*STRATEGY_OPTIONS[strategy]).new.tap do |config| + options.each do |option| + config[option] = params.dig(strategy, option) || Chewy.configuration.dig(:strategy_config, strategy, option) + end + + config[:reindex_wrapper] ||= ->(&reindex) { reindex.call } # default wrapper + end + else + raise NotImplementedError, "Unsupported strategy: '#{strategy}'" + end + end + end + end end end end diff --git a/lib/chewy/index/import.rb b/lib/chewy/index/import.rb index d9a23aaee..cc50a4056 100644 --- a/lib/chewy/index/import.rb +++ b/lib/chewy/index/import.rb @@ -73,7 +73,7 @@ module ClassMethods # @option options [true, Integer, Hash] parallel enables parallel import processing with the Parallel gem, accepts the number of workers or any Parallel gem acceptable options # @return [true, false] false in case of errors ruby2_keywords def import(*args) - import_routine(*args).blank? + intercept_import_using_strategy(*args).blank? end # @!method import!(*collection, **options) @@ -84,7 +84,8 @@ module ClassMethods # # @raise [Chewy::ImportFailed] in case of errors ruby2_keywords def import!(*args) - errors = import_routine(*args) + errors = intercept_import_using_strategy(*args) + raise Chewy::ImportFailed.new(self, errors) if errors.present? true @@ -126,6 +127,32 @@ def compose(object, crutches = nil, fields: []) private + def intercept_import_using_strategy(*args) + args_clone = args.deep_dup + options = args_clone.extract_options! + strategy = options.delete(:strategy) + + return import_routine(*args) if strategy.blank? + + ids = args_clone.flatten + return {} if ids.blank? + return {argument: {"#{strategy} supports ids only!" => ids}} unless ids.all? do |id| + id.respond_to?(:to_i) + end + + case strategy + when :delayed_sidekiq + begin + Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone + {} # success. errors handling convention + rescue StandardError => e + {scheduler: {e.message => ids}} + end + else + {argument: {"unsupported strategy: '#{strategy}'" => ids}} + end + end + def import_routine(*args) return if !args.first.nil? && empty_objects_or_scope?(args.first) diff --git a/lib/chewy/strategy.rb b/lib/chewy/strategy.rb index 83baadb24..a8c6c5dfa 100644 --- a/lib/chewy/strategy.rb +++ b/lib/chewy/strategy.rb @@ -8,6 +8,7 @@ require 'sidekiq' require 'chewy/strategy/sidekiq' require 'chewy/strategy/lazy_sidekiq' + require 'chewy/strategy/delayed_sidekiq' rescue LoadError nil end diff --git a/lib/chewy/strategy/delayed_sidekiq.rb b/lib/chewy/strategy/delayed_sidekiq.rb new file mode 100644 index 000000000..2c694071c --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Chewy + class Strategy + class DelayedSidekiq < Sidekiq + require_relative 'delayed_sidekiq/scheduler' + + def leave + @stash.each do |type, ids| + next if ids.empty? + + DelayedSidekiq::Scheduler.new(type, ids).postpone + end + end + end + end +end diff --git a/lib/chewy/strategy/delayed_sidekiq/scheduler.rb b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb new file mode 100644 index 000000000..9f4bf386e --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb @@ -0,0 +1,148 @@ +# frozen_string_literal: true + +require_relative '../../index' + +# The class is responsible for accumulating in redis [type, ids] +# that were requested to be reindexed during `latency` seconds. +# The reindex job is going to be scheduled after a `latency` seconds. +# that job is going to read accumulated [type, ids] from the redis +# and reindex all them at once. +module Chewy + class Strategy + class DelayedSidekiq + require_relative 'worker' + + class Scheduler + DEFAULT_TTL = 60 * 60 * 24 # in seconds + DEFAULT_LATENCY = 10 + DEFAULT_MARGIN = 2 + DEFAULT_QUEUE = 'chewy' + KEY_PREFIX = 'chewy:delayed_sidekiq' + FALLBACK_FIELDS = 'all' + FIELDS_IDS_SEPARATOR = ';' + IDS_SEPARATOR = ',' + + def initialize(type, ids, options = {}) + @type = type + @ids = ids + @options = options + end + + # the diagram: + # + # inputs: + # latency == 2 + # reindex_time = Time.current + # + # Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis): + # -------------------------------------------------------------------------------------------------- + # | + # process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1] + # Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] + # | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3) + # | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys + # | chewy:delayed_sidekiq:CitiesIndex:1679347866 + # | + # | + # process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2] + # Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] + # | & do not schedule a new worker + # | + # | + # process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] + # Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}] + # | & do not schedule a new worker + # | + # | + # process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3] + # Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4] + # | chewy:delayed_sidekiq:timechunks = [ + # | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"} + # | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"} + # | ] + # | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3) + # | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys + # | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex), + # | chewy:delayed_sidekiq:CitiesIndex:1679347868 + def postpone + ::Sidekiq.redis do |redis| + # warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead + if redis.respond_to?(:sadd?) + redis.sadd?(timechunk_key, serialize_data) + else + redis.sadd(timechunk_key, serialize_data) + end + + redis.expire(timechunk_key, ttl) + + unless redis.zrank(timechunks_key, timechunk_key) + redis.zadd(timechunks_key, at, timechunk_key) + redis.expire(timechunks_key, ttl) + + ::Sidekiq::Client.push( + 'queue' => sidekiq_queue, + 'at' => at + margin, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => [type_name, at] + ) + end + end + end + + private + + attr_reader :type, :ids, :options + + # this method returns predictable value that jumps by latency value + # another words each latency seconds it return the same value + def at + @at ||= begin + schedule_at = latency.seconds.from_now.to_f + + (schedule_at - (schedule_at % latency)).to_i + end + end + + def fields + options[:update_fields].presence || [FALLBACK_FIELDS] + end + + def timechunks_key + "#{KEY_PREFIX}:#{type_name}:timechunks" + end + + def timechunk_key + "#{KEY_PREFIX}:#{type_name}:#{at}" + end + + def serialize_data + [ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR) + end + + def type_name + type.name + end + + def latency + strategy_config.latency || DEFAULT_LATENCY + end + + def margin + strategy_config.margin || DEFAULT_MARGIN + end + + def ttl + strategy_config.ttl || DEFAULT_TTL + end + + def sidekiq_queue + Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE + end + + def strategy_config + type.strategy_config.delayed_sidekiq + end + end + end + end +end diff --git a/lib/chewy/strategy/delayed_sidekiq/worker.rb b/lib/chewy/strategy/delayed_sidekiq/worker.rb new file mode 100644 index 000000000..4d17a4cd1 --- /dev/null +++ b/lib/chewy/strategy/delayed_sidekiq/worker.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module Chewy + class Strategy + class DelayedSidekiq + class Worker + include ::Sidekiq::Worker + + def perform(type, score, options = {}) + options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async + + ::Sidekiq.redis do |redis| + timechunks_key = "#{Scheduler::KEY_PREFIX}:#{type}:timechunks" + timechunk_keys = redis.zrangebyscore(timechunks_key, -1, score) + members = timechunk_keys.flat_map { |timechunk_key| redis.smembers(timechunk_key) }.compact + + # extract ids and fields & do the reset of records + ids, fields = extract_ids_and_fields(members) + options[:update_fields] = fields if fields + + index = type.constantize + index.strategy_config.delayed_sidekiq.reindex_wrapper.call do + options.any? ? index.import!(ids, **options) : index.import!(ids) + end + + redis.del(timechunk_keys) + redis.zremrangebyscore(timechunks_key, -1, score) + end + end + + private + + def extract_ids_and_fields(members) + ids = [] + fields = [] + + members.each do |member| + member_ids, member_fields = member.split(Scheduler::FIELDS_IDS_SEPARATOR).map do |v| + v.split(Scheduler::IDS_SEPARATOR) + end + ids |= member_ids + fields |= member_fields + end + + fields = nil if fields.include?(Scheduler::FALLBACK_FIELDS) + + [ids.map(&:to_i), fields] + end + end + end + end +end diff --git a/spec/chewy/strategy/delayed_sidekiq_spec.rb b/spec/chewy/strategy/delayed_sidekiq_spec.rb new file mode 100644 index 000000000..ac9f792c7 --- /dev/null +++ b/spec/chewy/strategy/delayed_sidekiq_spec.rb @@ -0,0 +1,190 @@ +require 'spec_helper' + +if defined?(Sidekiq) + require 'sidekiq/testing' + require 'mock_redis' + + describe Chewy::Strategy::DelayedSidekiq do + around do |example| + Chewy.strategy(:bypass) { example.run } + end + + before do + redis = MockRedis.new + allow(Sidekiq).to receive(:redis).and_yield(redis) + Sidekiq::Worker.clear_all + end + + before do + stub_model(:city) do + update_index('cities') { self } + end + + stub_index(:cities) do + index_scope City + end + end + + let(:city) { City.create!(name: 'hello') } + let(:other_city) { City.create!(name: 'world') } + + it 'does not trigger immediate reindex due to it`s async nature' do + expect { [city, other_city].map(&:save!) } + .not_to update_index(CitiesIndex, strategy: :delayed_sidekiq) + end + + it "respects 'refresh: false' options" do + allow(Chewy).to receive(:disable_refresh_async).and_return(true) + expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], refresh: false) + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id, other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + + context 'with default config' do + it 'does schedule a job that triggers reindex with default options' do + Timecop.freeze do + expect(Sidekiq::Client).to receive(:push).with( + hash_including( + 'queue' => 'chewy', + 'at' => (Time.current.to_i.ceil(-1) + 2.seconds).to_i, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => ['CitiesIndex', an_instance_of(Integer)] + ) + ).and_call_original + + expect($stdout).not_to receive(:puts) + + Sidekiq::Testing.inline! do + expect { [city, other_city].map(&:save!) } + .to update_index(CitiesIndex, strategy: :delayed_sidekiq) + .and_reindex(city, other_city).only + end + end + end + end + + context 'with custom config' do + before do + CitiesIndex.strategy_config( + delayed_sidekiq: { + reindex_wrapper: lambda { |&reindex| + puts 'hello' + reindex.call + }, + margin: 5, + latency: 60 + } + ) + end + + it 'respects :strategy_config options' do + Timecop.freeze do + expect(Sidekiq::Client).to receive(:push).with( + hash_including( + 'queue' => 'chewy', + 'at' => (60.seconds.from_now.change(sec: 0) + 5.seconds).to_i, + 'class' => Chewy::Strategy::DelayedSidekiq::Worker, + 'args' => ['CitiesIndex', an_instance_of(Integer)] + ) + ).and_call_original + + expect($stdout).to receive(:puts).with('hello') # check that reindex_wrapper works + + Sidekiq::Testing.inline! do + expect { [city, other_city].map(&:save!) } + .to update_index(CitiesIndex, strategy: :delayed_sidekiq) + .and_reindex(city, other_city).only + end + end + end + end + + context 'two reindex call within the timewindow' do + it 'accumulates all ids does the reindex one time' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + + context 'one call with update_fields another one without update_fields' do + it 'does reindex of all fields' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + context 'both calls with different update fields' do + it 'deos reindex with union of fields' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id], update_fields: %w[description name]).once + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + end + + context 'two calls within different timewindows' do + it 'does two separate reindexes' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id]).once + expect(CitiesIndex).to receive(:import!).with([other_city.id]).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + context 'first call has update_fields' do + it 'does first reindex with the expected update_fields and second without update_fields' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id]).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + + context 'both calls have update_fields option' do + it 'does both reindexes with their expected update_fields option' do + Timecop.freeze do + expect(CitiesIndex).to receive(:import!).with([city.id], update_fields: ['name']).once + expect(CitiesIndex).to receive(:import!).with([other_city.id], update_fields: ['description']).once + Timecop.travel(20.seconds.ago) do + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) + scheduler.postpone + end + scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description']) + scheduler.postpone + Chewy::Strategy::DelayedSidekiq::Worker.drain + end + end + end + end +end