-
Notifications
You must be signed in to change notification settings - Fork 1k
/
redis.rb
194 lines (165 loc) · 5.33 KB
/
redis.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# frozen_string_literal: true
require "monitor"
require "redis/errors"
require "redis/commands"
class Redis
BASE_PATH = __dir__
Deprecated = Class.new(StandardError)
class << self
attr_accessor :silence_deprecations, :raise_deprecations
def deprecate!(message)
unless silence_deprecations
if raise_deprecations
raise Deprecated, message
else
::Kernel.warn(message)
end
end
end
end
# soft-deprecated
# We added this back for older sidekiq releases
module Connection
class << self
def drivers
[RedisClient.default_driver]
end
end
end
include Commands
SERVER_URL_OPTIONS = %i(url host port path).freeze
# Create a new client instance
#
# @param [Hash] options
# @option options [String] :url (value of the environment variable REDIS_URL) a Redis URL, for a TCP connection:
# `redis://:[password]@[hostname]:[port]/[db]` (password, port and database are optional), for a unix socket
# connection: `unix://[path to Redis socket]`. This overrides all other options.
# @option options [String] :host ("127.0.0.1") server hostname
# @option options [Integer] :port (6379) server port
# @option options [String] :path path to server socket (overrides host and port)
# @option options [Float] :timeout (1.0) timeout in seconds
# @option options [Float] :connect_timeout (same as timeout) timeout for initial connect in seconds
# @option options [String] :username Username to authenticate against server
# @option options [String] :password Password to authenticate against server
# @option options [Integer] :db (0) Database to select after connect and on reconnects
# @option options [Symbol] :driver Driver to use, currently supported: `:ruby`, `:hiredis`
# @option options [String] :id ID for the client connection, assigns name to current connection by sending
# `CLIENT SETNAME`
# @option options [Integer, Array<Integer, Float>] :reconnect_attempts Number of attempts trying to connect,
# or a list of sleep duration between attempts.
# @option options [Boolean] :inherit_socket (false) Whether to use socket in forked process or not
# @option options [String] :name The name of the server group to connect to.
# @option options [Array] :sentinels List of sentinels to contact
#
# @return [Redis] a new client instance
def initialize(options = {})
@monitor = Monitor.new
@options = options.dup
@options[:reconnect_attempts] = 1 unless @options.key?(:reconnect_attempts)
if ENV["REDIS_URL"] && SERVER_URL_OPTIONS.none? { |o| @options.key?(o) }
@options[:url] = ENV["REDIS_URL"]
end
inherit_socket = @options.delete(:inherit_socket)
@subscription_client = nil
@client = initialize_client(@options)
@client.inherit_socket! if inherit_socket
end
# Run code without the client reconnecting
def without_reconnect(&block)
@client.disable_reconnection(&block)
end
# Test whether or not the client is connected
def connected?
@client.connected? || @subscription_client&.connected?
end
# Disconnect the client as quickly and silently as possible.
def close
@client.close
@subscription_client&.close
end
alias disconnect! close
def with
yield self
end
def _client
@client
end
def pipelined(exception: true)
synchronize do |client|
client.pipelined(exception: exception) do |raw_pipeline|
yield PipelinedConnection.new(raw_pipeline, exception: exception)
end
end
end
def id
@client.id || @client.server_url
end
def inspect
"#<Redis client v#{Redis::VERSION} for #{id}>"
end
def dup
self.class.new(@options)
end
def connection
{
host: @client.host,
port: @client.port,
db: @client.db,
id: id,
location: "#{@client.host}:#{@client.port}"
}
end
private
def initialize_client(options)
if options.key?(:cluster)
raise "Redis Cluster support was moved to the `redis-clustering` gem."
end
if options.key?(:sentinels)
Client.sentinel(**options).new_client
else
Client.config(**options).new_client
end
end
def synchronize
@monitor.synchronize { yield(@client) }
end
def send_command(command, &block)
@monitor.synchronize do
@client.call_v(command, &block)
end
rescue ::RedisClient::Error => error
Client.translate_error!(error)
end
def send_blocking_command(command, timeout, &block)
@monitor.synchronize do
@client.blocking_call_v(timeout, command, &block)
end
end
def _subscription(method, timeout, channels, block)
if block
if @subscription_client
raise SubscriptionError, "This client is already subscribed"
end
begin
@subscription_client = SubscribedClient.new(@client.pubsub)
if timeout > 0
@subscription_client.send(method, timeout, *channels, &block)
else
@subscription_client.send(method, *channels, &block)
end
ensure
@subscription_client&.close
@subscription_client = nil
end
else
unless @subscription_client
raise SubscriptionError, "This client is not subscribed"
end
@subscription_client.call_v([method].concat(channels))
end
end
end
require "redis/version"
require "redis/client"
require "redis/pipeline"
require "redis/subscribe"