Skip to content

Commit

Permalink
Server/Client: add each_key support
Browse files Browse the repository at this point in the history
  • Loading branch information
asppsa committed Sep 6, 2019
1 parent 6b8de54 commit 1da6a1a
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 15 deletions.
3 changes: 2 additions & 1 deletion feature_matrix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,14 @@ backends:
- adapter: Client
platforms: [ MRI, JRuby ]
features: [multiprocess]
unknown: [ increment, create, expires, persist ]
unknown: [ increment, create, expires, persist, each_key ]
description: "Moneta client adapter"
notes:
increment: depends on server
create: depends on server
expires: depends on server
persist: depends on server
each_key: depends on server
- adapter: RestClient
platforms: [ MRI, JRuby ]
features: [ multiprocess ]
Expand Down
31 changes: 31 additions & 0 deletions lib/moneta/adapters/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,37 @@ def close
nil
end

# (see Proxy#each_key)
def each_key
raise NotImplementedError, 'each_key is not supported' unless supports?(:each_key)
return enum_for(:each_key) unless block_given?

begin
write(:each_key)
yield_break = false

loop do
write('NEXT')

# A StopIteration error will be raised by this call if the server
# reached the end of the enumeration. This will stop the loop
# automatically.
result = read_msg

# yield_break will be true in the ensure block (below) if anything
# happened during the yield to stop further enumeration.
yield_break = true
yield result
yield_break = false
end
ensure
write('BREAK') if yield_break
read_msg # nil return from each_key
end

self
end

# (see Default#features)
def features
@features ||=
Expand Down
7 changes: 6 additions & 1 deletion lib/moneta/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@ def initialize(adapter, options = {})
protected

def wrap(name, *args, &block)
self.locks ||= Set.new
if locked?
yield
else
lock!(&block)
end
end

def locks=(locks)
Thread.current.thread_variable_set('Moneta::Lock', locks)
end

def locks
Thread.current['Moneta::Lock'] ||= Set.new
Thread.current.thread_variable_get('Moneta::Lock')
end

def lock!(&block)
Expand Down
31 changes: 27 additions & 4 deletions lib/moneta/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ def mainloop

def dispatch(method, args)
case method
when :key?, :load, :delete, :increment, :create
when :key?, :load, :delete, :increment, :create, :features
@store.send(method, *args)
when :features
# all features except each_key are supported
@store.features - [:each_key]
when :store, :clear
@store.send(method, *args)
nil
when :each_key
yield_each(@store.each_key)
nil
end
rescue => e
e
Expand Down Expand Up @@ -119,6 +119,29 @@ def close(raising: nil)
@io.close rescue nil
raise Closed, raising if raising
end

def yield_each(enumerator)
received_break = false
loop do
case msg = read_msg
when %w{NEXT}
# This will raise a StopIteration at the end of the enumeration,
# which will exit the loop.
write(enumerator.next)
when %w{BREAK}
# This is received when the client wants to stop the enumeration.
received_break = true
break
else
# Otherwise, the client is attempting to call another method within
# an `each` block.
write_dispatch(msg)
end
end
ensure
# This tells the client to stop enumerating
write(StopIteration.new("Server initiated stop")) unless received_break
end
end

# @param [Hash] options
Expand Down
2 changes: 0 additions & 2 deletions lib/moneta/shared.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ module Moneta
#
# @api public
class Shared < Wrapper
not_supports :each_key

# @param [Hash] options
# @option options [Integer] :port (9000) TCP port
# @option options [String] :host Server hostname
Expand Down
5 changes: 5 additions & 0 deletions spec/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,13 @@ def start_server(*args)
server = Moneta::Server.new(*args)
Thread.new { server.run }
sleep 0.1 until server.running?
server
rescue Exception => ex
puts "Failed to start server - #{ex.message}"
tries ||= 0
tries += 1
sleep Moneta::Server::TIMEOUT
tries < 3 ? retry : raise
end

def moneta_property_of(keys: 0, values: 0)
Expand Down
8 changes: 6 additions & 2 deletions spec/moneta/adapters/client/adapter_client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
describe 'adapter_client', isolate: true, adapter: :Client do
before :all do
start_server(Moneta::Adapters::Memory.new)
@server = start_server(Moneta::Adapters::Memory.new)
end

after :all do
@server.stop
end

moneta_build do
Moneta::Adapters::Client.new
end

moneta_specs ADAPTER_SPECS
moneta_specs ADAPTER_SPECS.with_each_key
end
6 changes: 5 additions & 1 deletion spec/moneta/adapters/client/standard_client_tcp_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
describe "standard_client_tcp", isolate: true, adapter: :Client do
before :all do
start_server(Moneta::Adapters::Memory.new)
@server = start_server(Moneta::Adapters::Memory.new)
end

after :all do
@server.stop
end

moneta_store :Client
Expand Down
8 changes: 6 additions & 2 deletions spec/moneta/adapters/client/standard_client_unix_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
describe "standard_client_unix", isolate: true, adapter: :Client do
before :all do
start_server Moneta::Adapters::Memory.new,
socket: File.join(tempdir, 'standard_client_unix')
@server = start_server Moneta::Adapters::Memory.new,
socket: File.join(tempdir, 'standard_client_unix')
end

after :all do
@server.stop
end

moneta_store :Client do
Expand Down
4 changes: 2 additions & 2 deletions spec/moneta/proxies/shared/shared_tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
tempdir = self.tempdir
Moneta.build do
use(:Shared, port: 9001) do
adapter :PStore, file: File.join(tempdir, 'shared_tcp')
adapter :GDBM, file: File.join(tempdir, 'shared_tcp')
end
end
end

shared_examples :shared_tcp do
moneta_specs ADAPTER_SPECS
moneta_specs ADAPTER_SPECS.with_each_key

it 'shares values' do
store['shared_key'] = 'shared_value'
Expand Down

0 comments on commit 1da6a1a

Please sign in to comment.