Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ gem 'faraday'
gem 'foundation_emails'
gem 'good_job'
gem 'hashie', '~> 4.1'
gem 'hiredis'
gem 'hiredis', '~> 0.6.0'
gem 'http_accept_language'
gem 'jwt'
gem 'local_time'
Expand All @@ -50,6 +50,7 @@ gem 'rack-headers_filter'
gem 'rack-timeout', require: false
gem 'readthis'
gem 'redacted_struct'
gem 'redis', '>= 3.2.0'
gem 'redis-session-store', '>= 0.11.3'
gem 'retries'
gem 'rotp', '~> 6.1'
Expand Down
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ DEPENDENCIES
good_job
guard-rspec
hashie (~> 4.1)
hiredis
hiredis (~> 0.6.0)
http_accept_language
i18n-tasks (>= 0.9.31)
identity-hostdata!
Expand Down Expand Up @@ -769,6 +769,7 @@ DEPENDENCIES
raise-if-root
readthis
redacted_struct
redis (>= 3.2.0)
redis-session-store (>= 0.11.3)
retries
rotp (~> 6.1)
Expand Down
47 changes: 34 additions & 13 deletions app/jobs/risc_delivery_job.rb
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
class RiscDeliveryJob < ApplicationJob
queue_as :low

retry_on Faraday::TimeoutError, Faraday::ConnectionFailed, wait: :exponentially_longer
retry_on Faraday::TimeoutError,
Faraday::ConnectionFailed,
wait: :exponentially_longer,
attempts: 5
retry_on RedisRateLimiter::LimitError,
wait: :exponentially_longer,
attempts: 10

def perform(
push_notification_url:,
jwt:,
event_type:,
issuer:
issuer:,
now: Time.zone.now
)
response = faraday.post(
push_notification_url,
jwt,
'Accept' => 'application/json',
'Content-Type' => 'application/secevent+jwt',
) do |req|
req.options.context = {
service_name: inline? ? 'risc_http_push_direct' : 'risc_http_push_async',
}
response = rate_limiter(push_notification_url).attempt!(now) do
faraday.post(
push_notification_url,
jwt,
'Accept' => 'application/json',
'Content-Type' => 'application/secevent+jwt',
) do |req|
req.options.context = {
service_name: inline? ? 'risc_http_push_direct' : 'risc_http_push_async',
}
end
end

unless response.success?
Expand All @@ -31,12 +40,12 @@ def perform(
}.to_json,
)
end
rescue Faraday::TimeoutError, Faraday::ConnectionFailed => err
rescue Faraday::TimeoutError, Faraday::ConnectionFailed, RedisRateLimiter::LimitError => err
raise err if !inline?

Rails.logger.warn(
{
event: 'http_push_error',
event: err.is_a?(RedisRateLimiter::LimitError) ? 'http_push_rate_limit' : 'http_push_error',
transport: 'direct',
event_type: event_type,
service_provider: issuer,
Expand All @@ -45,6 +54,18 @@ def perform(
)
end

def rate_limiter(url)
url_overrides = IdentityConfig.store.risc_notifications_rate_limit_overrides.fetch(url, {})

RedisRateLimiter.new(
key: "push-notification-#{url}",
max_requests: url_overrides['max_requests'] ||
IdentityConfig.store.risc_notifications_rate_limit_max_requests,
interval: url_overrides['interval'] ||
IdentityConfig.store.risc_notifications_rate_limit_interval,
)
end

def faraday
Faraday.new do |f|
f.request :instrumentation, name: 'request_log.faraday'
Expand Down
4 changes: 2 additions & 2 deletions app/services/encrypted_redis_struct_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module EncryptedRedisStructStorage
def load(id, type:)
check_for_id_property!(type)

ciphertext = REDIS_POOL.with { |client| client.read(key(id, type: type)) }
ciphertext = READTHIS_POOL.with { |client| client.read(key(id, type: type)) }
return nil if ciphertext.blank?

json = Encryption::Encryptors::SessionEncryptor.new.decrypt(ciphertext)
Expand Down Expand Up @@ -51,7 +51,7 @@ def store(struct, expires_in: 60)

payload.transform_values!(&utf_8_encode_strs)

REDIS_POOL.with do |client|
READTHIS_POOL.with do |client|
client.write(
key(struct.id, type: struct.class),
Encryption::Encryptors::SessionEncryptor.new.encrypt(payload.to_json),
Expand Down
52 changes: 52 additions & 0 deletions app/services/redis_rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Implementation of https://redis.com/redis-best-practices/basic-rate-limiting/
class RedisRateLimiter
class LimitError < StandardError; end

attr_reader :key, :max_requests, :interval, :redis_pool

# @param [String] key the item to throttle on
# @param [Integer] max_requests the max number of requests allowed per interval
# @param [Integer] interval number of seconds
def initialize(key:, max_requests:, interval:, redis_pool: REDIS_POOL)
@key = key
@max_requests = max_requests
@interval = interval.to_i
@redis_pool = redis_pool
end

# @yield a block to run if the limit has not been hit
# @raise [LimitError] throws an error when the limit has been hit, and the
# block was not run
def attempt!(now = Time.zone.now)
raise LimitError, "rate limit for #{key} has maxed out" if maxed?(now)

increment(now)

yield
end

# @return [Boolean]
def maxed?(now = Time.zone.now)
redis_pool.with do |redis|
redis.get(build_key(now)).to_i >= max_requests
end
end

def increment(now = Time.zone.now)
rate_limit_key = build_key(now)

redis_pool.with do |redis|
redis.multi do
redis.incr(rate_limit_key)
redis.expire(rate_limit_key, interval - 1)
end
end
end

# @api private
# @return [String]
def build_key(now)
rounded_seconds = (now.to_i / interval) * interval
"redis-rate-limiter:#{key}:#{rounded_seconds}"
end
end
8 changes: 4 additions & 4 deletions app/services/service_provider_request_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ def self.from_uuid(uuid)

def self.delete(request_id)
return unless request_id
REDIS_POOL.with do |client|
READTHIS_POOL.with do |client|
client.delete(key(request_id))
self.redis_last_uuid = nil if Rails.env.test?
end
end

def self.find_by(uuid:)
return if uuid.blank?
obj = REDIS_POOL.with { |client| client.read(key(uuid)) }
obj = READTHIS_POOL.with { |client| client.read(key(uuid)) }
obj ? hash_to_spr(obj, uuid) : nil
end

Expand Down Expand Up @@ -56,7 +56,7 @@ def self.create(hash)
end

def self.write(obj, uuid)
REDIS_POOL.with do |client|
READTHIS_POOL.with do |client|
client.write(key(uuid), obj)
self.redis_last_uuid = uuid if Rails.env.test?
end
Expand All @@ -76,7 +76,7 @@ def self.key(uuid)
end

def self.flush
REDIS_POOL.with(&:clear) if Rails.env.test?
READTHIS_POOL.with(&:clear) if Rails.env.test?
end

def self.hash_to_spr(hash, uuid)
Expand Down
3 changes: 3 additions & 0 deletions config/application.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ reset_password_email_max_attempts: '20'
reset_password_email_window_in_minutes: '60'
risc_notifications_local_enabled: 'false'
risc_notifications_active_job_enabled: 'false'
risc_notifications_rate_limit_interval: '60'
risc_notifications_rate_limit_max_requests: '1000'
risc_notifications_rate_limit_overrides: '{"https://example.com/push":{"interval":120,"max_requests":10000}}'
ruby_workers_enabled: 'true'
rules_of_use_horizon_years: '6'
rules_of_use_updated_at: '2021-05-21T00:00:00Z'
Expand Down
7 changes: 6 additions & 1 deletion config/initializers/redis.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
REDIS_POOL = ConnectionPool.new(size: 10) do
READTHIS_POOL = ConnectionPool.new(size: 10) do
# LG-5030: remove Readthis gem
Readthis::Cache.new(
expires_in: IdentityConfig.store.service_provider_request_ttl_hours.hours.to_i,
redis: { url: IdentityConfig.store.redis_url, driver: :hiredis },
)
end

REDIS_POOL = ConnectionPool.new(size: 10) do
Redis.new(url: IdentityConfig.store.redis_url)
end
3 changes: 3 additions & 0 deletions lib/identity_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ def self.build_store(config_map)
config.add(:reset_password_email_window_in_minutes, type: :integer)
config.add(:risc_notifications_local_enabled, type: :boolean)
config.add(:risc_notifications_active_job_enabled, type: :boolean)
config.add(:risc_notifications_rate_limit_interval, type: :integer)
config.add(:risc_notifications_rate_limit_max_requests, type: :integer)
config.add(:risc_notifications_rate_limit_overrides, type: :json)
config.add(:ruby_workers_enabled, type: :boolean)
config.add(:rules_of_use_horizon_years, type: :integer)
config.add(:rules_of_use_updated_at, type: :timestamp)
Expand Down
55 changes: 54 additions & 1 deletion spec/jobs/risc_delivery_job_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
require 'rails_helper'

RSpec.describe RiscDeliveryJob do
around do |ex|
REDIS_POOL.with { |r| r.flushdb }
ex.run
REDIS_POOL.with { |r| r.flushdb }
end

describe '#perform' do
let(:push_notification_url) { 'https://push.example.gov' }
let(:jwt) { JWT.encode({ foo: 'bar' }, 'a') }
let(:event_type) { PushNotification::IdentifierRecycledEvent::EVENT_TYPE }
let(:issuer) { 'issuer1' }
let(:transport) { 'ruby_worker' }
let(:now) { 5.hours.ago }

let(:job) { RiscDeliveryJob.new }
subject(:perform) do
Expand All @@ -15,6 +21,7 @@
jwt: jwt,
event_type: event_type,
issuer: issuer,
now: now,
)
end

Expand Down Expand Up @@ -64,5 +71,51 @@
end
end
end

context 'rate limiting' do
before do
REDIS_POOL.with { |r| r.set(job.rate_limiter(push_notification_url).build_key(now), 9999) }
end

context 'when performed inline' do
it 'warns on limit hit' do
expect(Rails.logger).to receive(:warn) do |msg|
payload = JSON.parse(msg, symbolize_names: true)

expect(payload[:event]).to eq('http_push_rate_limit')
expect(payload[:transport]).to eq('direct')
end

expect { perform }.to_not raise_error
end
end

context 'when performed in a worker' do
before do
allow(job).to receive(:queue_adapter).
and_return(ActiveJob::QueueAdapters::GoodJobAdapter.new)
end

it 'raises on rate limit errors (and retries via ActiveJob)' do
expect(Rails.logger).to_not receive(:warn)

expect { perform }.to raise_error(RedisRateLimiter::LimitError)
end
end

context 'when the rate limit is overridden' do
before do
allow(IdentityConfig.store).to receive(:risc_notifications_rate_limit_overrides).
and_return({ push_notification_url => { 'max_requests' => 1e6, 'interval' => 500 } })
end

it 'allows the request' do
req = stub_request(:post, push_notification_url)
perform

expect(req).to have_been_requested
end
end
end
end
end
2 changes: 1 addition & 1 deletion spec/models/document_capture_session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

result_id = record.result_id
key = EncryptedRedisStructStorage.key(result_id, type: DocumentCaptureSessionResult)
data = REDIS_POOL.with { |client| client.read(key) }
data = READTHIS_POOL.with { |client| client.read(key) }
expect(data).to be_a(String)
expect(data).to_not include('Testy')
expect(data).to_not include('Testerson')
Expand Down
2 changes: 1 addition & 1 deletion spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
Rails.application.load_seed

begin
REDIS_POOL.with { |cache| cache.pool.with(&:info) }
READTHIS_POOL.with { |cache| cache.pool.with(&:info) }
rescue RuntimeError => error
puts error
puts 'It appears Redis is not running, but it is required for (some) specs to run'
Expand Down
4 changes: 2 additions & 2 deletions spec/services/encrypted_redis_struct_storage_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def self.redis_key_prefix
struct_class.new(id: id, a: 'value for a', b: 'value for b', c: 'value for c'),
)

data = REDIS_POOL.with do |client|
data = READTHIS_POOL.with do |client|
client.read(EncryptedRedisStructStorage.key(id, type: struct_class))
end

Expand All @@ -134,7 +134,7 @@ def self.redis_key_prefix
struct_class.new(id: id, a: 'value for a', b: 'value for b', c: 'value for c'),
)

ttl = REDIS_POOL.with do |client|
ttl = READTHIS_POOL.with do |client|
client.pool.with do |redis|
redis.ttl(EncryptedRedisStructStorage.key(id, type: struct_class))
end
Expand Down
Loading