diff --git a/lib/fluent/plugin_helper/storage.rb b/lib/fluent/plugin_helper/storage.rb index 32b8a1f5c7..f3f6d92920 100644 --- a/lib/fluent/plugin_helper/storage.rb +++ b/lib/fluent/plugin_helper/storage.rb @@ -30,6 +30,10 @@ module Storage StorageState = Struct.new(:storage, :running) def storage_create(usage: '', type: nil, conf: nil, default_type: nil) + if conf && !conf.arg.empty? + usage = conf.arg + end + s = @_storages[usage] if s && s.running return s.storage @@ -72,7 +76,7 @@ def storage_create(usage: '', type: nil, conf: nil, default_type: nil) module StorageParams include Fluent::Configurable # minimum section definition to instantiate storage plugin instances - config_section :storage, required: false, multi: true, param_name: :storage_configs do + config_section :storage, required: false, multi: true, param_name: :storage_configs, init: true do config_argument :usage, :string, default: '' config_param :@type, :string, default: Fluent::Plugin::Storage::DEFAULT_TYPE end @@ -194,6 +198,10 @@ def initialize(storage) def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated? + def method_missing(name, *args) + @monitor.synchronize{ @storage.__send__(name, *args) } + end + def persistent_always? true end @@ -274,7 +282,7 @@ class SynchronizeWrapper def initialize(storage) @storage = storage - @mutex = Mutex.new + @monitor = Monitor.new end def_delegators :@storage, :persistent, :autosave, :autosave_interval, :save_at_shutdown @@ -282,6 +290,10 @@ def initialize(storage) def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated? + def method_missing(name, *args) + @monitor.synchronize{ @storage.__send__(name, *args) } + end + def synchronized? true end @@ -291,35 +303,35 @@ def implementation end def load - @mutex.synchronize do + @monitor.synchronize do @storage.load end end def save - @mutex.synchronize do + @monitor.synchronize do @storage.save end end def get(key) - @mutex.synchronize{ @storage.get(key) } + @monitor.synchronize{ @storage.get(key) } end def fetch(key, defval) - @mutex.synchronize{ @storage.fetch(key, defval) } + @monitor.synchronize{ @storage.fetch(key, defval) } end def put(key, value) - @mutex.synchronize{ @storage.put(key, value) } + @monitor.synchronize{ @storage.put(key, value) } end def delete(key) - @mutex.synchronize{ @storage.delete(key) } + @monitor.synchronize{ @storage.delete(key) } end def update(key, &block) - @mutex.synchronize do + @monitor.synchronize do v = block.call(@storage.get(key)) @storage.put(key, v) v diff --git a/test/plugin_helper/test_storage.rb b/test/plugin_helper/test_storage.rb index ead7fa21ba..7f36329acc 100644 --- a/test/plugin_helper/test_storage.rb +++ b/test/plugin_helper/test_storage.rb @@ -102,7 +102,8 @@ class Dummy2 < Fluent::Plugin::TestBase test 'can override default configuration parameters, but not overwrite whole definition' do d = Dummy.new d.configure(config_element()) - assert_equal [], d.storage_configs + assert_equal 1, d.storage_configs.size + assert_equal 'local', d.storage_configs.first[:@type] d = Dummy2.new d.configure(config_element('ROOT', '', {}, [config_element('storage', '', {}, [])])) @@ -130,7 +131,7 @@ class Dummy2 < Fluent::Plugin::TestBase d = Dummy2.new d.configure(config_element()) assert_raise Fluent::ConfigError.new("@type is required in ") do - d.storage_create(conf: config_element('storage', '', {}), default_type: 'ex2') + d.storage_create(conf: config_element('storage', 'foo', {}), default_type: 'ex2') end end @@ -139,7 +140,7 @@ class Dummy2 < Fluent::Plugin::TestBase assert_nothing_raised do d.configure(config_element()) end - assert_equal 0, d._storages.size + assert_equal 1, d._storages.size end test 'can be configured with a storage section' do