Skip to content

Commit

Permalink
Limit number of redis connections and fix ttl bug (#75)
Browse files Browse the repository at this point in the history
* Attempting a quick workaround for gush creating a massive number of redis connection.

With this design, each thread has its own redis connection (which is replaced if for some reason
a new redis url is seen (should be unlikely))

* Set TTL on job key based on class not name (name includes uuid which doesn't exist in redis). Added tests to verify as well.

* Clean up dependencies and fix travis CI
  • Loading branch information
anirbanmu authored and pokonski committed Jan 21, 2020
1 parent 682c7fe commit 7c89931
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 61 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ tmp
test.rb
/Gushfile
dump.rdb
.ruby-version
.ruby-gemset
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ rvm:
- 2.2.2
- 2.3.4
- 2.4.1
- 2.5
- 2.6
- 2.7
services:
- redis-server
email:
Expand Down
4 changes: 2 additions & 2 deletions gush.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency "activejob", ">= 4.2.7", "< 6.0"
spec.add_dependency "connection_pool", "~> 2.2.1"
spec.add_dependency "concurrent-ruby", "~> 1.0"
spec.add_dependency "multi_json", "~> 1.11"
spec.add_dependency "redis", ">= 3.2", "< 5"
spec.add_dependency "redis-mutex", "~> 4.0.1"
Expand All @@ -28,7 +28,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "colorize", "~> 0.7"
spec.add_dependency "thor", "~> 0.19"
spec.add_dependency "launchy", "~> 2.4"
spec.add_development_dependency "bundler", "~> 1.5"
spec.add_development_dependency "bundler"
spec.add_development_dependency "rake", "~> 10.4"
spec.add_development_dependency "rspec", '~> 3.0'
spec.add_development_dependency "pry", '~> 0.10'
Expand Down
96 changes: 39 additions & 57 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
require 'connection_pool'
require 'redis'
require 'concurrent-ruby'

module Gush
class Client
attr_reader :configuration

@@redis_connection = Concurrent::ThreadLocalVar.new(nil)

def self.redis_connection(config)
cached = (@@redis_connection.value ||= { url: config.redis_url, connection: nil })
return cached[:connection] if !cached[:connection].nil? && config.redis_url == cached[:url]

Redis.new(url: config.redis_url).tap do |instance|
RedisClassy.redis = instance
@@redis_connection.value = { url: config.redis_url, connection: instance }
end
end

def initialize(config = Gush.configuration)
@configuration = config
end
Expand Down Expand Up @@ -47,9 +60,7 @@ def next_free_job_id(workflow_id, job_klass)

loop do
job_id = SecureRandom.uuid
available = connection_pool.with do |redis|
!redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
end
available = !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)

break if available
end
Expand All @@ -61,9 +72,7 @@ def next_free_workflow_id
id = nil
loop do
id = SecureRandom.uuid
available = connection_pool.with do |redis|
!redis.exists("gush.workflow.#{id}")
end
available = !redis.exists("gush.workflow.#{id}")

break if available
end
Expand All @@ -72,37 +81,31 @@ def next_free_workflow_id
end

def all_workflows
connection_pool.with do |redis|
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
find_workflow(id)
end
redis.scan_each(match: "gush.workflows.*").map do |key|
id = key.sub("gush.workflows.", "")
find_workflow(id)
end
end

def find_workflow(id)
connection_pool.with do |redis|
data = redis.get("gush.workflows.#{id}")
data = redis.get("gush.workflows.#{id}")

unless data.nil?
hash = Gush::JSON.decode(data, symbolize_keys: true)
keys = redis.scan_each(match: "gush.jobs.#{id}.*")
unless data.nil?
hash = Gush::JSON.decode(data, symbolize_keys: true)
keys = redis.scan_each(match: "gush.jobs.#{id}.*")

nodes = keys.each_with_object([]) do |key, array|
array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
end

workflow_from_hash(hash, nodes)
else
raise WorkflowNotFound.new("Workflow with given id doesn't exist")
nodes = keys.each_with_object([]) do |key, array|
array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
end

workflow_from_hash(hash, nodes)
else
raise WorkflowNotFound.new("Workflow with given id doesn't exist")
end
end

def persist_workflow(workflow)
connection_pool.with do |redis|
redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
end
redis.set("gush.workflows.#{workflow.id}", workflow.to_json)

workflow.jobs.each {|job| persist_job(workflow.id, job) }
workflow.mark_as_persisted
Expand All @@ -111,9 +114,7 @@ def persist_workflow(workflow)
end

def persist_job(workflow_id, job)
connection_pool.with do |redis|
redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end
redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end

def find_job(workflow_id, job_name)
Expand All @@ -132,31 +133,23 @@ def find_job(workflow_id, job_name)
end

def destroy_workflow(workflow)
connection_pool.with do |redis|
redis.del("gush.workflows.#{workflow.id}")
end
redis.del("gush.workflows.#{workflow.id}")
workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end

def destroy_job(workflow_id, job)
connection_pool.with do |redis|
redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
end
redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
end

def expire_workflow(workflow, ttl=nil)
ttl = ttl || configuration.ttl
connection_pool.with do |redis|
redis.expire("gush.workflows.#{workflow.id}", ttl)
end
redis.expire("gush.workflows.#{workflow.id}", ttl)
workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end

def expire_job(workflow_id, job, ttl=nil)
ttl = ttl || configuration.ttl
connection_pool.with do |redis|
redis.expire("gush.jobs.#{workflow_id}.#{job.name}", ttl)
end
redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl)
end

def enqueue_job(workflow_id, job)
Expand All @@ -172,16 +165,11 @@ def enqueue_job(workflow_id, job)
def find_job_by_klass_and_id(workflow_id, job_name)
job_klass, job_id = job_name.split('|')

connection_pool.with do |redis|
redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
end
redis.hget("gush.jobs.#{workflow_id}.#{job_klass}", job_id)
end

def find_job_by_klass(workflow_id, job_name)
new_cursor, result = connection_pool.with do |redis|
redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1)
end

new_cursor, result = redis.hscan("gush.jobs.#{workflow_id}.#{job_name}", 0, count: 1)
return nil if result.empty?

job_id, job = *result[0]
Expand All @@ -202,14 +190,8 @@ def workflow_from_hash(hash, nodes = [])
flow
end

def build_redis
Redis.new(url: configuration.redis_url).tap do |instance|
RedisClassy.redis = instance
end
end

def connection_pool
@connection_pool ||= ConnectionPool.new(size: configuration.concurrency, timeout: 1) { build_redis }
def redis
self.class.redis_connection(configuration)
end
end
end
10 changes: 8 additions & 2 deletions spec/gush/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@
end

describe "#expire_workflow" do
let(:ttl) { 2000 }

it "sets TTL for all Redis keys related to the workflow" do
workflow = TestWorkflow.create

client.expire_workflow(workflow, -1)
client.expire_workflow(workflow, ttl)

expect(redis.ttl("gush.workflows.#{workflow.id}")).to eq(ttl)

# => TODO - I believe fakeredis does not handle TTL the same.
workflow.jobs.each do |job|
expect(redis.ttl("gush.jobs.#{workflow.id}.#{job.klass}")).to eq(ttl)
end
end
end

Expand Down

0 comments on commit 7c89931

Please sign in to comment.