Skip to content

Add support for multiple throttling strategies #623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
11 changes: 7 additions & 4 deletions app/channels/turbo/streams/broadcasts.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def broadcast_prepend_later_to(*streamables, **opts)
broadcast_action_later_to(*streamables, action: :prepend, **opts)
end

def broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id, **opts)
refresh_debouncer_for(*streamables, request_id: request_id).debounce do
def broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id, throttle_with: Turbo.current_throttler, **opts)
refresh_throttler_for(*streamables, request_id: request_id, throttle_with: throttle_with).throttle do
Turbo::Streams::BroadcastStreamJob.perform_later stream_name_from(streamables), content: turbo_stream_refresh_tag(request_id: request_id, **opts)
end
end
Expand All @@ -91,8 +91,11 @@ def broadcast_stream_to(*streamables, content:)
ActionCable.server.broadcast stream_name_from(streamables), content
end

def refresh_debouncer_for(*streamables, request_id: nil) # :nodoc:
Turbo::ThreadDebouncer.for("turbo-refresh-debouncer-#{stream_name_from(streamables.including(request_id))}")
def refresh_throttler_for(*streamables, throttle_with: nil, request_id: nil) # :nodoc:
Turbo::ThreadThrottler.for(
"turbo-refresh-throttler-#{stream_name_from(streamables.including(request_id))}",
throttler: throttle_with
)
end

def broadcast_morph_to(*streamables, **opts)
Expand Down
52 changes: 42 additions & 10 deletions app/models/concerns/turbo/broadcastable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,41 @@
# after a new clearance is created. All clients subscribed to this stream will refresh the page to reflect
# the changes.
#
# When broadcasting page refreshes, Turbo will automatically debounce multiple calls in a row to only broadcast the last one.
# When broadcasting page refreshes, Turbo will automatically throttle multiple calls in a row.
# This is meant for scenarios where you process records in mass. Because of the nature of such signals, it makes no sense to
# broadcast them repeatedly and individually.
#
# By default, page refreshes are throttled with a trailing-edge debouncer with a delay of 0.5 sec. This means that when
# multiple calls are made only the last one will be executed after 0.5 seconds have passed.
#
# You can change this behavior with the +throttle_with:+ option.
#
# class Import < ApplicationRecord
# # This changes the delay on the debouncer from 0.5 to 0.1 seconds
# broadcast_refreshes throttle_with: { type: :debouncer, delay 0.1 }
# end
#
# Alternatively, you can throttle with a rate limmiter which will only allow a maximum number of calls in a given interval.
#
# class Import < ApplicationRecord
# # This will broadcast at most 2 refreshes every 5 seconds
# broadcast_refreshes throttle_with: { type: :rate_limiter, max: 2, interval: 5 }
# end
#
# If `throttle_with` isn't explicitly set, Turbo will use the default, or whatever is set by `Turbo.with_throttler`.
#
# class Import < ApplicationRecord
# broadcast_refreshes
# end
#
# class ImportProcessorJob < ApplicationJob
# def perform(import)
# Turbo.with_throttler(type: :rate_limiter, max: 2, interval: 5) do
# import.process
# end
# end
# end
#
# == Suppressing broadcasts
#
# Sometimes, you need to disable broadcasts in certain scenarios. You can use <tt>.suppressing_turbo_broadcasts</tt> to create
Expand Down Expand Up @@ -206,15 +238,15 @@ def broadcasts(stream = model_name.plural, inserts_by: :append, target: broadcas
# belongs_to :board
# broadcasts_refreshes_to ->(message) { [ message.board, :messages ] }
# end
def broadcasts_refreshes_to(stream)
after_commit -> { broadcast_refresh_later_to(stream.try(:call, self) || send(stream)) }
def broadcasts_refreshes_to(stream, **opts)
after_commit -> { broadcast_refresh_later_to(stream.try(:call, self) || send(stream), **opts) }
end

# Same as <tt>#broadcasts_refreshes_to</tt>, but the designated stream for page refreshes is automatically set to
# the current model, for creates - to the model plural name, which can be overriden by passing <tt>stream</tt>.
def broadcasts_refreshes(stream = model_name.plural)
after_create_commit -> { broadcast_refresh_later_to(stream) }
after_update_commit -> { broadcast_refresh_later }
def broadcasts_refreshes(stream = model_name.plural, **opts)
after_create_commit -> { broadcast_refresh_later_to(stream, **opts) }
after_update_commit -> { broadcast_refresh_later(**opts) }
after_destroy_commit -> { broadcast_refresh }
end

Expand Down Expand Up @@ -430,13 +462,13 @@ def broadcast_prepend_later(target: broadcast_target_default, **rendering)
end

# Same as <tt>broadcast_refresh_to</tt> but run asynchronously via a <tt>Turbo::Streams::BroadcastJob</tt>.
def broadcast_refresh_later_to(*streamables)
Turbo::StreamsChannel.broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id) unless suppressed_turbo_broadcasts?
def broadcast_refresh_later_to(*streamables, **opts)
Turbo::StreamsChannel.broadcast_refresh_later_to(*streamables, request_id: Turbo.current_request_id, **opts) unless suppressed_turbo_broadcasts?
end

# Same as <tt>#broadcast_refresh_later_to</tt>, but the designated stream is automatically set to the current model.
def broadcast_refresh_later
broadcast_refresh_later_to self
def broadcast_refresh_later(**opts)
broadcast_refresh_later_to(self, **opts)
end

# Same as <tt>broadcast_action_to</tt> but run asynchronously via a <tt>Turbo::Streams::BroadcastJob</tt>.
Expand Down
10 changes: 7 additions & 3 deletions app/models/turbo/debouncer.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
class Turbo::Debouncer
attr_accessor :cleanup
attr_reader :delay, :scheduled_task

DEFAULT_DELAY = 0.5

def initialize(delay: DEFAULT_DELAY)
def initialize(delay: DEFAULT_DELAY, cleanup: nil)
@delay = delay
@scheduled_task = nil
@cleanup = cleanup
end

def debounce(&block)
def throttle(&block)
scheduled_task&.cancel unless scheduled_task&.complete?
@scheduled_task = Concurrent::ScheduledTask.execute(delay, &block)
@scheduled_task = Concurrent::ScheduledTask.execute(delay) do
block.call.tap { cleanup&.call }
end
end

def wait
Expand Down
37 changes: 37 additions & 0 deletions app/models/turbo/rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
class Turbo::RateLimiter
attr_accessor :cleanup
attr_reader :count, :max, :interval, :scheduled_task

DEFAUL_MAX = 1
DEFAULT_INTERVAL = 2

def initialize(max: DEFAUL_MAX, interval: DEFAULT_INTERVAL, cleanup: nil)
@interval = interval
@max = max
@count = Concurrent::AtomicFixnum.new(0)
@cleanup = cleanup
end

def throttle(&block)
return if count.value >= max

@count.increment
block.call

return if @scheduled_task && !@scheduled_task.complete?

@scheduled_task = Concurrent::ScheduledTask.execute(interval) do
@count.value = 0
cleanup&.call
end
end

def wait
scheduled_task&.wait(wait_timeout)
end

private
def wait_timeout
interval + 1
end
end
28 changes: 0 additions & 28 deletions app/models/turbo/thread_debouncer.rb

This file was deleted.

62 changes: 62 additions & 0 deletions app/models/turbo/thread_throttler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# A decorated debouncer that will store instances in the current thread clearing them
# after the debounced logic triggers.
class Turbo::ThreadThrottler
delegate :wait, to: :throttler

def self.for(key, throttler:)
Thread.current[key] ||= new(key, Thread.current, throttler)
end

private_class_method :new

def initialize(key, thread, throttler)
@key = key
@thread = thread
@throttler = build_throttler(throttler)
@throttler.cleanup = -> { thread[key] = nil }
end

def throttle
throttler.throttle do
yield
end
end

private
attr_reader :key, :throttler, :thread

def build_throttler(throttler)
klass, args = extract_throttler_class_and_arguments(throttler)
klass.new(**args)
end

def extract_throttler_class_and_arguments(throttler)
[
extract_throttler_class(throttler),
extract_throttler_arguments(throttler)
]
end


def extract_throttler_class(throttler)
case throttler
when Symbol
"Turbo::#{throttler.to_s.camelize}".constantize
when String
throttler.constantize
when Hash
extract_throttler_class(throttler.fetch(:type))
else
throttler
end
end

def extract_throttler_arguments(throttler)
case throttler
when Hash
throttler.except(:type)
else
{}
end
end
end
8 changes: 8 additions & 0 deletions lib/turbo-rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Turbo
mattr_accessor :draw_routes, default: true

thread_mattr_accessor :current_request_id
thread_mattr_accessor :current_throttler, default: :debouncer

class << self
attr_writer :signed_stream_verifier_key
Expand All @@ -25,5 +26,12 @@ def with_request_id(request_id)
ensure
self.current_request_id = old_request_id
end

def with_throttler(throttler)
old_throttler, self.current_throttler = self.current_throttler, throttler
yield
ensure
self.current_throttler = old_throttler
end
end
end
28 changes: 28 additions & 0 deletions test/current_throttler_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require "test_helper"
require "action_cable"

class Turbo::CurrentThrottlerTest < ActiveSupport::TestCase
test "sets the current throttler for a block of code" do
assert_equal :debouncer, Turbo.current_throttler

result = Turbo.with_throttler(:rate_limiter) do
assert_equal :rate_limiter, Turbo.current_throttler
:the_result
end

assert_equal :the_result, result
assert_equal :debouncer, Turbo.current_throttler
end

test "raised errors will raise and clear the current request id" do
assert_equal :debouncer, Turbo.current_throttler

assert_raise "Some error" do
Turbo.with_throttler(:rate_limiter) do
raise "Some error"
end
end

assert_equal :debouncer, Turbo.current_throttler
end
end
38 changes: 38 additions & 0 deletions test/models/turbo/debouncer_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
require "test_helper"

class Turbo::DebouncerTest < ActiveSupport::TestCase
test "executes just the last call and ignores all previous ones" do
debouncer = Turbo::Debouncer.new(delay: 0.2)
calls = []

%w[a b c d e f g].each do |letter|
debouncer.throttle { calls << letter}
end

assert calls.empty?

debouncer.wait

assert_equal 1, calls.size
assert_equal "g", calls.first

debouncer.throttle { calls << "h"}
debouncer.wait

assert_equal 2, calls.size
assert_equal "h", calls.last
end

test "calls the cleanup block after it executes" do
calls = []
debouncer = Turbo::Debouncer.new(delay: 0.2, cleanup: -> { calls << :cleanup })

debouncer.throttle { calls << :first}

assert calls.empty?

debouncer.wait

assert_equal [:first, :cleanup], calls
end
end
Loading