Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

follow up on ways to reduce allocation on multi_get #45

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bin/profile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#
# run with:
# RUBY_YJIT_ENABLE=1 bundle exec bin/profile
# view profiles locally with (add to gemfile for easy access):
# bundle exec profile-viewer client_get_multi_allocations.json
require 'bundler/inline'
require 'json'

Expand Down Expand Up @@ -118,6 +120,22 @@ def sock_set_multi(sock, pairs)
sock.gets(TERMINATOR) # clear the buffer
end

def allocations
x = GC.stat(:total_allocated_objects)
yield
GC.stat(:total_allocated_objects) - x
end

if bench_target == 'get_multi_allocations'
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
puts "allocations:"
puts allocations { client.get_multi(pairs.keys) }
Vernier.profile(out: 'client_get_multi_allocations.json', allocation_interval: 1) do
client.get_multi(pairs.keys) while Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time < bench_time
end
exit 0
end

if %w[all get].include?(bench_target)
Vernier.profile(out: 'client_get_profile.json') do
start_time = Time.now
Expand Down
2 changes: 1 addition & 1 deletion lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ def quiet?
alias multi? quiet?

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe
ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze

private

ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze
def verify_allowed_quiet!(opkey)
return if ALLOWED_QUIET_OPS.include?(opkey)

Expand Down
12 changes: 9 additions & 3 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Protocol
# Manages the socket connection to the server, including ensuring liveness
# and retries.
##
class ConnectionManager
class ConnectionManager # rubocop:disable Metrics/ClassLength
DEFAULTS = {
# seconds between trying to contact a remote server
down_retry_delay: 30,
Expand Down Expand Up @@ -160,14 +160,20 @@ def read_line
error_on_request!(e)
end

def read_byte
@sock.readbyte
rescue SystemCallError, *TIMEOUT_ERRORS, EOFError => e
error_on_request!(e)
end

def read(count)
@sock.read(count)
rescue SystemCallError, *TIMEOUT_ERRORS, EOFError => e
error_on_request!(e)
end

def read_exact(count)
@sock.read(count)
def read_to_outstring(count, outstring)
@sock.read(count, outstring)
rescue SystemCallError, *TIMEOUT_ERRORS, EOFError => e
error_on_request!(e)
end
Expand Down
56 changes: 41 additions & 15 deletions lib/dalli/protocol/meta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@ module Protocol
# protocol. Contains logic for managing connection state to the server (retries, etc),
# formatting requests to the server, and unpacking responses.
##
class Meta < Base
class Meta < Base # rubocop:disable Metrics/ClassLength
TERMINATOR = "\r\n"
META_NOOP = "mn\r\n"
META_NOOP_RESP = 'MN'
META_VALUE_RESP = 'VA'
META_GET_REQ_NO_FLAGS = "v k q\r\n"
META_GET_REQ = "v f k q\r\n"
M_CONSTANT = 'M'.ord
V_CONSTANT = 'V'.ord
SUPPORTS_CAPACITY = Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.4.0')

def response_processor
Expand Down Expand Up @@ -57,27 +64,46 @@ def read_multi_req(keys)
# Pre-allocate the results hash with expected size
results = SUPPORTS_CAPACITY ? Hash.new(nil, capacity: keys.size) : {}
optimized_for_raw = @value_marshaller.raw_by_default
key_index = optimized_for_raw ? 2 : 3
terminator_buffer = String.new(TERMINATOR, capacity: TERMINATOR.size)

post_get_req = optimized_for_raw ? "v k q\r\n" : "v f k q\r\n"
post_get_req = optimized_for_raw ? META_GET_REQ_NO_FLAGS : META_GET_REQ
keys.each do |key|
@connection_manager.write("mg #{key} #{post_get_req}")
end
@connection_manager.write("mn\r\n")
@connection_manager.write(META_NOOP)
@connection_manager.flush

terminator_length = TERMINATOR.length
while (line = @connection_manager.readline)
break if line == TERMINATOR || line[0, 2] == 'MN'
next unless line[0, 3] == 'VA '

# VA value_length flags key
tokens = line.split
value = @connection_manager.read_exact(tokens[1].to_i)
bitflags = optimized_for_raw ? 0 : @response_processor.bitflags_from_tokens(tokens)
@connection_manager.read_exact(terminator_length) # read the terminator
results[tokens[key_index].byteslice(1..-1)] =
@value_marshaller.retrieve(value, bitflags)
while (byte = @connection_manager.read_byte)
if byte == M_CONSTANT
@connection_manager.readline
break
end
unless byte == V_CONSTANT
@connection_manager.readline
next
end

@connection_manager.read_byte # skip 'A'
@connection_manager.read_byte # skip terminator
line = @connection_manager.readline
# VA value_length kNAME\r\n
# if rindex and linex are equal split out flags
right_seperator_index = line.rindex(' ')
left_seperator_index = line.index(' ')
bitflags = if right_seperator_index == left_seperator_index
0
else
line.byteslice(left_seperator_index + 2, right_seperator_index - left_seperator_index - 1).to_i
end

# +2 on index skips the space and 'k', then - 4 for the ' k' and "\r\n"
key = line.byteslice(right_seperator_index + 2, (line.length - right_seperator_index - 4))

value = @connection_manager.read(line.to_i)
@connection_manager.read_to_outstring(terminator_length, terminator_buffer)
results[key] =
bitflags.zero? ? value : @value_marshaller.retrieve(value, bitflags)
end
results
end
Expand Down
3 changes: 2 additions & 1 deletion lib/dalli/protocol/meta/response_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ResponseProcessor
def initialize(io_source, value_marshaller)
@io_source = io_source
@value_marshaller = value_marshaller
@terminator_buffer = String.new(TERMINATOR, capacity: TERMINATOR.size)
end

def meta_get_with_value(cache_nils: false, skip_flags: false)
Expand Down Expand Up @@ -249,7 +250,7 @@ def next_line_to_tokens

def read_data(data_size)
resp_data = @io_source.read(data_size)
@io_source.read(TERMINATOR.bytesize)
@io_source.read_to_outstring(TERMINATOR.bytesize, @terminator_buffer)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this removes an additional allocation for all the non multi calls

resp_data
end
end
Expand Down
8 changes: 8 additions & 0 deletions test/integration/test_pipelined_get.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,18 @@
dc.set('a', 'foo')
dc.set('b', 'bar')

dc.set('a_really_big', 'foo')
dc.set('big_big_really_long_key', 'bar')

resp = dc.get_multi(keys_to_query)
expected_resp = { 'a' => 'foo', 'b' => 'bar' }

assert_equal(expected_resp, resp)

resp = dc.get_multi(%w[a_really_big big_big_really_long_key])
expected_resp = { 'a_really_big' => 'foo', 'big_big_really_long_key' => 'bar' }

assert_equal(expected_resp, resp)
end
end

Expand Down
Loading