diff --git a/lib/fluent/plugin/storage_local.rb b/lib/fluent/plugin/storage_local.rb index a431453f9b..326a162152 100644 --- a/lib/fluent/plugin/storage_local.rb +++ b/lib/fluent/plugin/storage_local.rb @@ -29,10 +29,16 @@ class LocalStorage < Storage DEFAULT_FILE_MODE = 0644 config_param :path, :string, default: nil - config_param :mode, :integer, default: DEFAULT_FILE_MODE - config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE + config_param :mode, default: DEFAULT_FILE_MODE do |v| + v.to_i(8) + end + config_param :dir_mode, default: DEFAULT_DIR_MODE do |v| + v.to_i(8) + end config_param :pretty_print, :bool, default: false + attr_reader :store # for test + def initialize super @store = {} @@ -42,9 +48,13 @@ def configure(conf) super @on_memory = false - if !@path && !@_plugin_id_configured + if @path + # use it + elsif root_dir = owner.plugin_root_dir + @path = File.join(root_dir, 'storage.json') + else if @persistent - raise Fluent::ConfigError, "Plugin @id or path for required to save data" + raise Fluent::ConfigError, "Plugin @id or path for required when 'persistent' is true" else if @autosave log.warn "both of Plugin @id and path for are not specified. Using on-memory store." @@ -53,18 +63,11 @@ def configure(conf) end @on_memory = true end - elsif @path - # ok - else # @_plugin_id_configured is true - log.warn "path for is not specified. Using on-memory store temporarily, but will use file store after support global storage path" - @on_memory = true - ## TODO: get process-wide directory for plugin storage, and generate path for this plugin storage instance - # path = end if !@on_memory dir = File.dirname(@path) - FileUtils.mkdir_p(dir, mode: @dir_mode) unless File.exist?(dir) + FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir) if File.exist?(@path) raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path) begin @@ -75,7 +78,7 @@ def configure(conf) raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'" end else - raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{dir}'" unless File.writable?(dir) + raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable? end end end diff --git a/test/plugin/test_storage_local.rb b/test/plugin/test_storage_local.rb index f125234347..9cf8df41f0 100644 --- a/test/plugin/test_storage_local.rb +++ b/test/plugin/test_storage_local.rb @@ -1,8 +1,294 @@ require_relative '../helper' require 'fluent/plugin/storage_local' +require 'fluent/plugin/input' +require 'fluent/system_config' +require 'fileutils' class LocalStorageTest < Test::Unit::TestCase - test 'syntax' do - assert true + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/storage_local#{ENV['TEST_ENV_NUMBER']}") + + class MyInput < Fluent::Plugin::Input + helpers :storage + config_section :storage do + config_set_default :@type, 'local' + end + end + + setup do + FileUtils.rm_rf(TMP_DIR) + FileUtils.mkdir_p(TMP_DIR) + Fluent::Test.setup + @d = MyInput.new + end + + teardown do + @d.stop unless @d.stopped? + @d.before_shutdown unless @d.before_shutdown? + @d.shutdown unless @d.shutdown? + @d.after_shutdown unless @d.after_shutdown? + @d.close unless @d.closed? + @d.terminate unless @d.terminated? + end + + sub_test_case 'without any configuration' do + test 'works as on-memory storage' do + conf = config_element() + + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert_nil @p.path + assert @p.store.empty? + + assert_nil @p.get('key1') + assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY') + + @p.put('key1', '1') + assert_equal '1', @p.get('key1') + + @p.update('key1') do |v| + (v.to_i * 2).to_s + end + assert_equal '2', @p.get('key1') + + @p.save # on-memory storage does nothing... + + @d.stop; @d.before_shutdown; @d.shutdown; @d.after_shutdown; @d.close; @d.terminate + + # re-create to reload storage contents + @d = MyInput.new + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert @p.store.empty? + end + + test 'warns about on-memory storage if autosave is true' do + @d.configure(config_element()) + @d.start + @p = @d.storage_create() + + logs = @d.log.out.logs + assert{ logs.any?{|log| log.include?("[warn]: both of Plugin @id and path for are not specified. Using on-memory store.") } } + end + + test 'show info log about on-memory storage if autosave is false' do + @d.configure(config_element('ROOT', '', {}, [config_element('storage', '', {'autosave' => 'false'})])) + @d.start + @p = @d.storage_create() + + logs = @d.log.out.logs + assert{ logs.any?{|log| log.include?("[info]: both of Plugin @id and path for are not specified. Using on-memory store.") } } + end + end + + sub_test_case 'configured with path' do + test 'works as storage which stores data on disk' do + storage_path = File.join(TMP_DIR, 'my_store.json') + conf = config_element('ROOT', '', {}, [config_element('storage', '', {'path' => storage_path})]) + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert_equal storage_path, @p.path + assert @p.store.empty? + + assert_nil @p.get('key1') + assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY') + + @p.put('key1', '1') + assert_equal '1', @p.get('key1') + + @p.update('key1') do |v| + (v.to_i * 2).to_s + end + assert_equal '2', @p.get('key1') + + @p.save # stores all data into file + + assert File.exist?(storage_path) + + @p.put('key2', 4) + + @d.stop; @d.before_shutdown; @d.shutdown; @d.after_shutdown; @d.close; @d.terminate + + assert_equal({'key1' => '2', 'key2' => 4}, File.open(storage_path){|f| JSON.parse(f.read) }) + + # re-create to reload storage contents + @d = MyInput.new + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert_false @p.store.empty? + + assert_equal '2', @p.get('key1') + assert_equal 4, @p.get('key2') + end + end + + sub_test_case 'configured with root-dir and plugin id' do + test 'works as storage which stores data under root dir' do + root_dir = File.join(TMP_DIR, 'root') + expected_storage_path = File.join(root_dir, 'worker0', 'local_storage_test', 'storage.json') + conf = config_element('ROOT', '', {'@id' => 'local_storage_test'}) + Fluent::SystemConfig.overwrite_system_config('root_dir' => root_dir) do + @d.configure(conf) + end + @d.start + @p = @d.storage_create() + + assert_equal expected_storage_path, @p.path + assert @p.store.empty? + + assert_nil @p.get('key1') + assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY') + + @p.put('key1', '1') + assert_equal '1', @p.get('key1') + + @p.update('key1') do |v| + (v.to_i * 2).to_s + end + assert_equal '2', @p.get('key1') + + @p.save # stores all data into file + + assert File.exist?(expected_storage_path) + + @p.put('key2', 4) + + @d.stop; @d.before_shutdown; @d.shutdown; @d.after_shutdown; @d.close; @d.terminate + + assert_equal({'key1' => '2', 'key2' => 4}, File.open(expected_storage_path){|f| JSON.parse(f.read) }) + + # re-create to reload storage contents + @d = MyInput.new + Fluent::SystemConfig.overwrite_system_config('root_dir' => root_dir) do + @d.configure(conf) + end + @d.start + @p = @d.storage_create() + + assert_false @p.store.empty? + + assert_equal '2', @p.get('key1') + assert_equal 4, @p.get('key2') + end + end + + sub_test_case 'persistent specified' do + test 'works well with path' do + omit "It's hard to test on Windows" if Fluent.windows? + + storage_path = File.join(TMP_DIR, 'my_store.json') + conf = config_element('ROOT', '', {}, [config_element('storage', '', {'path' => storage_path, 'persistent' => 'true'})]) + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert_equal storage_path, @p.path + assert @p.store.empty? + + assert_nil @p.get('key1') + assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY') + + @p.put('key1', '1') + assert_equal({'key1' => '1'}, File.open(storage_path){|f| JSON.parse(f.read) }) + + @p.update('key1') do |v| + (v.to_i * 2).to_s + end + assert_equal({'key1' => '2'}, File.open(storage_path){|f| JSON.parse(f.read) }) + end + + test 'works well with root-dir and plugin id' do + omit "It's hard to test on Windows" if Fluent.windows? + + root_dir = File.join(TMP_DIR, 'root') + expected_storage_path = File.join(root_dir, 'worker0', 'local_storage_test', 'storage.json') + conf = config_element('ROOT', '', {'@id' => 'local_storage_test'}, [config_element('storage', '', {'persistent' => 'true'})]) + Fluent::SystemConfig.overwrite_system_config('root_dir' => root_dir) do + @d.configure(conf) + end + @d.start + @p = @d.storage_create() + + assert_equal expected_storage_path, @p.path + assert @p.store.empty? + + assert_nil @p.get('key1') + assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY') + + @p.put('key1', '1') + assert_equal({'key1' => '1'}, File.open(expected_storage_path){|f| JSON.parse(f.read) }) + + @p.update('key1') do |v| + (v.to_i * 2).to_s + end + assert_equal({'key1' => '2'}, File.open(expected_storage_path){|f| JSON.parse(f.read) }) + end + + test 'raises error if it is configured to use on-memory storage' do + assert_raise Fluent::ConfigError.new("Plugin @id or path for required when 'persistent' is true") do + @d.configure(config_element('ROOT', '', {}, [config_element('storage', '', {'persistent' => 'true'})])) + end + end + end + + sub_test_case 'with various configurations' do + test 'mode and dir_mode controls permissions of stored data' do + storage_path = File.join(TMP_DIR, 'dir', 'my_store.json') + storage_conf = { + 'path' => storage_path, + 'mode' => '0600', + 'dir_mode' => '0777', + } + conf = config_element('ROOT', '', {}, [config_element('storage', '', storage_conf)]) + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert_equal storage_path, @p.path + assert @p.store.empty? + + @p.put('key1', '1') + assert_equal '1', @p.get('key1') + + @p.save # stores all data into file + + assert File.exist?(storage_path) + assert_equal 0600, (File.stat(storage_path).mode % 01000) + assert_equal 0777, (File.stat(File.dirname(storage_path)).mode % 01000) + end + + test 'pretty_print controls to write data in files as human-easy-to-read' do + storage_path = File.join(TMP_DIR, 'dir', 'my_store.json') + storage_conf = { + 'path' => storage_path, + 'pretty_print' => 'true', + } + conf = config_element('ROOT', '', {}, [config_element('storage', '', storage_conf)]) + @d.configure(conf) + @d.start + @p = @d.storage_create() + + assert_equal storage_path, @p.path + assert @p.store.empty? + + @p.put('key1', '1') + assert_equal '1', @p.get('key1') + + @p.save # stores all data into file + + expected_pp_json = <