Skip to content

Commit

Permalink
fix to initialize storage config sections in default (parser/filter a…
Browse files Browse the repository at this point in the history
…re done in past)
  • Loading branch information
tagomoris committed Dec 19, 2016
1 parent 792bb6d commit feb2a6e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
30 changes: 21 additions & 9 deletions lib/fluent/plugin_helper/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ module Storage
StorageState = Struct.new(:storage, :running)

def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
if conf && !conf.arg.empty?
usage = conf.arg
end

s = @_storages[usage]
if s && s.running
return s.storage
Expand Down Expand Up @@ -72,7 +76,7 @@ def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
module StorageParams
include Fluent::Configurable
# minimum section definition to instantiate storage plugin instances
config_section :storage, required: false, multi: true, param_name: :storage_configs do
config_section :storage, required: false, multi: true, param_name: :storage_configs, init: true do
config_argument :usage, :string, default: ''
config_param :@type, :string, default: Fluent::Plugin::Storage::DEFAULT_TYPE
end
Expand Down Expand Up @@ -194,6 +198,10 @@ def initialize(storage)
def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate
def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated?

def method_missing(name, *args)
@monitor.synchronize{ @storage.__send__(name, *args) }
end

def persistent_always?
true
end
Expand Down Expand Up @@ -274,14 +282,18 @@ class SynchronizeWrapper

def initialize(storage)
@storage = storage
@mutex = Mutex.new
@monitor = Monitor.new
end

def_delegators :@storage, :persistent, :autosave, :autosave_interval, :save_at_shutdown
def_delegators :@storage, :persistent_always?
def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate
def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated?

def method_missing(name, *args)
@monitor.synchronize{ @storage.__send__(name, *args) }
end

def synchronized?
true
end
Expand All @@ -291,35 +303,35 @@ def implementation
end

def load
@mutex.synchronize do
@monitor.synchronize do
@storage.load
end
end

def save
@mutex.synchronize do
@monitor.synchronize do
@storage.save
end
end

def get(key)
@mutex.synchronize{ @storage.get(key) }
@monitor.synchronize{ @storage.get(key) }
end

def fetch(key, defval)
@mutex.synchronize{ @storage.fetch(key, defval) }
@monitor.synchronize{ @storage.fetch(key, defval) }
end

def put(key, value)
@mutex.synchronize{ @storage.put(key, value) }
@monitor.synchronize{ @storage.put(key, value) }
end

def delete(key)
@mutex.synchronize{ @storage.delete(key) }
@monitor.synchronize{ @storage.delete(key) }
end

def update(key, &block)
@mutex.synchronize do
@monitor.synchronize do
v = block.call(@storage.get(key))
@storage.put(key, v)
v
Expand Down
7 changes: 4 additions & 3 deletions test/plugin_helper/test_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class Dummy2 < Fluent::Plugin::TestBase
test 'can override default configuration parameters, but not overwrite whole definition' do
d = Dummy.new
d.configure(config_element())
assert_equal [], d.storage_configs
assert_equal 1, d.storage_configs.size
assert_equal 'local', d.storage_configs.first[:@type]

d = Dummy2.new
d.configure(config_element('ROOT', '', {}, [config_element('storage', '', {}, [])]))
Expand Down Expand Up @@ -130,7 +131,7 @@ class Dummy2 < Fluent::Plugin::TestBase
d = Dummy2.new
d.configure(config_element())
assert_raise Fluent::ConfigError.new("@type is required in <storage>") do
d.storage_create(conf: config_element('storage', '', {}), default_type: 'ex2')
d.storage_create(conf: config_element('storage', 'foo', {}), default_type: 'ex2')
end
end

Expand All @@ -139,7 +140,7 @@ class Dummy2 < Fluent::Plugin::TestBase
assert_nothing_raised do
d.configure(config_element())
end
assert_equal 0, d._storages.size
assert_equal 1, d._storages.size
end

test 'can be configured with a storage section' do
Expand Down

0 comments on commit feb2a6e

Please sign in to comment.