Skip to content

Commit

Permalink
use plugin root dir as storage directory if available, and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Dec 19, 2016
1 parent feb2a6e commit b32781d
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 15 deletions.
29 changes: 16 additions & 13 deletions lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ class LocalStorage < Storage
DEFAULT_FILE_MODE = 0644

config_param :path, :string, default: nil
config_param :mode, :integer, default: DEFAULT_FILE_MODE
config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE
config_param :mode, default: DEFAULT_FILE_MODE do |v|
v.to_i(8)
end
config_param :dir_mode, default: DEFAULT_DIR_MODE do |v|
v.to_i(8)
end
config_param :pretty_print, :bool, default: false

attr_reader :store # for test

def initialize
super
@store = {}
Expand All @@ -42,9 +48,13 @@ def configure(conf)
super

@on_memory = false
if !@path && !@_plugin_id_configured
if @path
# use it
elsif root_dir = owner.plugin_root_dir
@path = File.join(root_dir, 'storage.json')
else
if @persistent
raise Fluent::ConfigError, "Plugin @id or path for <storage> required to save data"
raise Fluent::ConfigError, "Plugin @id or path for <storage> required when 'persistent' is true"
else
if @autosave
log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
Expand All @@ -53,18 +63,11 @@ def configure(conf)
end
@on_memory = true
end
elsif @path
# ok
else # @_plugin_id_configured is true
log.warn "path for <storage> is not specified. Using on-memory store temporarily, but will use file store after support global storage path"
@on_memory = true
## TODO: get process-wide directory for plugin storage, and generate path for this plugin storage instance
# path =
end

if !@on_memory
dir = File.dirname(@path)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless File.exist?(dir)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
if File.exist?(@path)
raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
begin
Expand All @@ -75,7 +78,7 @@ def configure(conf)
raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
end
else
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{dir}'" unless File.writable?(dir)
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
end
end
end
Expand Down
290 changes: 288 additions & 2 deletions test/plugin/test_storage_local.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,294 @@
require_relative '../helper'
require 'fluent/plugin/storage_local'
require 'fluent/plugin/input'
require 'fluent/system_config'
require 'fileutils'

class LocalStorageTest < Test::Unit::TestCase
test 'syntax' do
assert true
TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/storage_local#{ENV['TEST_ENV_NUMBER']}")

class MyInput < Fluent::Plugin::Input
helpers :storage
config_section :storage do
config_set_default :@type, 'local'
end
end

setup do
FileUtils.rm_rf(TMP_DIR)
FileUtils.mkdir_p(TMP_DIR)
Fluent::Test.setup
@d = MyInput.new
end

teardown do
@d.stop unless @d.stopped?
@d.before_shutdown unless @d.before_shutdown?
@d.shutdown unless @d.shutdown?
@d.after_shutdown unless @d.after_shutdown?
@d.close unless @d.closed?
@d.terminate unless @d.terminated?
end

sub_test_case 'without any configuration' do
test 'works as on-memory storage' do
conf = config_element()

@d.configure(conf)
@d.start
@p = @d.storage_create()

assert_nil @p.path
assert @p.store.empty?

assert_nil @p.get('key1')
assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY')

@p.put('key1', '1')
assert_equal '1', @p.get('key1')

@p.update('key1') do |v|
(v.to_i * 2).to_s
end
assert_equal '2', @p.get('key1')

@p.save # on-memory storage does nothing...

@d.stop; @d.before_shutdown; @d.shutdown; @d.after_shutdown; @d.close; @d.terminate

# re-create to reload storage contents
@d = MyInput.new
@d.configure(conf)
@d.start
@p = @d.storage_create()

assert @p.store.empty?
end

test 'warns about on-memory storage if autosave is true' do
@d.configure(config_element())
@d.start
@p = @d.storage_create()

logs = @d.log.out.logs
assert{ logs.any?{|log| log.include?("[warn]: both of Plugin @id and path for <storage> are not specified. Using on-memory store.") } }
end

test 'show info log about on-memory storage if autosave is false' do
@d.configure(config_element('ROOT', '', {}, [config_element('storage', '', {'autosave' => 'false'})]))
@d.start
@p = @d.storage_create()

logs = @d.log.out.logs
assert{ logs.any?{|log| log.include?("[info]: both of Plugin @id and path for <storage> are not specified. Using on-memory store.") } }
end
end

sub_test_case 'configured with path' do
test 'works as storage which stores data on disk' do
storage_path = File.join(TMP_DIR, 'my_store.json')
conf = config_element('ROOT', '', {}, [config_element('storage', '', {'path' => storage_path})])
@d.configure(conf)
@d.start
@p = @d.storage_create()

assert_equal storage_path, @p.path
assert @p.store.empty?

assert_nil @p.get('key1')
assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY')

@p.put('key1', '1')
assert_equal '1', @p.get('key1')

@p.update('key1') do |v|
(v.to_i * 2).to_s
end
assert_equal '2', @p.get('key1')

@p.save # stores all data into file

assert File.exist?(storage_path)

@p.put('key2', 4)

@d.stop; @d.before_shutdown; @d.shutdown; @d.after_shutdown; @d.close; @d.terminate

assert_equal({'key1' => '2', 'key2' => 4}, File.open(storage_path){|f| JSON.parse(f.read) })

# re-create to reload storage contents
@d = MyInput.new
@d.configure(conf)
@d.start
@p = @d.storage_create()

assert_false @p.store.empty?

assert_equal '2', @p.get('key1')
assert_equal 4, @p.get('key2')
end
end

sub_test_case 'configured with root-dir and plugin id' do
test 'works as storage which stores data under root dir' do
root_dir = File.join(TMP_DIR, 'root')
expected_storage_path = File.join(root_dir, 'worker0', 'local_storage_test', 'storage.json')
conf = config_element('ROOT', '', {'@id' => 'local_storage_test'})
Fluent::SystemConfig.overwrite_system_config('root_dir' => root_dir) do
@d.configure(conf)
end
@d.start
@p = @d.storage_create()

assert_equal expected_storage_path, @p.path
assert @p.store.empty?

assert_nil @p.get('key1')
assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY')

@p.put('key1', '1')
assert_equal '1', @p.get('key1')

@p.update('key1') do |v|
(v.to_i * 2).to_s
end
assert_equal '2', @p.get('key1')

@p.save # stores all data into file

assert File.exist?(expected_storage_path)

@p.put('key2', 4)

@d.stop; @d.before_shutdown; @d.shutdown; @d.after_shutdown; @d.close; @d.terminate

assert_equal({'key1' => '2', 'key2' => 4}, File.open(expected_storage_path){|f| JSON.parse(f.read) })

# re-create to reload storage contents
@d = MyInput.new
Fluent::SystemConfig.overwrite_system_config('root_dir' => root_dir) do
@d.configure(conf)
end
@d.start
@p = @d.storage_create()

assert_false @p.store.empty?

assert_equal '2', @p.get('key1')
assert_equal 4, @p.get('key2')
end
end

sub_test_case 'persistent specified' do
test 'works well with path' do
omit "It's hard to test on Windows" if Fluent.windows?

storage_path = File.join(TMP_DIR, 'my_store.json')
conf = config_element('ROOT', '', {}, [config_element('storage', '', {'path' => storage_path, 'persistent' => 'true'})])
@d.configure(conf)
@d.start
@p = @d.storage_create()

assert_equal storage_path, @p.path
assert @p.store.empty?

assert_nil @p.get('key1')
assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY')

@p.put('key1', '1')
assert_equal({'key1' => '1'}, File.open(storage_path){|f| JSON.parse(f.read) })

@p.update('key1') do |v|
(v.to_i * 2).to_s
end
assert_equal({'key1' => '2'}, File.open(storage_path){|f| JSON.parse(f.read) })
end

test 'works well with root-dir and plugin id' do
omit "It's hard to test on Windows" if Fluent.windows?

root_dir = File.join(TMP_DIR, 'root')
expected_storage_path = File.join(root_dir, 'worker0', 'local_storage_test', 'storage.json')
conf = config_element('ROOT', '', {'@id' => 'local_storage_test'}, [config_element('storage', '', {'persistent' => 'true'})])
Fluent::SystemConfig.overwrite_system_config('root_dir' => root_dir) do
@d.configure(conf)
end
@d.start
@p = @d.storage_create()

assert_equal expected_storage_path, @p.path
assert @p.store.empty?

assert_nil @p.get('key1')
assert_equal 'EMPTY', @p.fetch('key1', 'EMPTY')

@p.put('key1', '1')
assert_equal({'key1' => '1'}, File.open(expected_storage_path){|f| JSON.parse(f.read) })

@p.update('key1') do |v|
(v.to_i * 2).to_s
end
assert_equal({'key1' => '2'}, File.open(expected_storage_path){|f| JSON.parse(f.read) })
end

test 'raises error if it is configured to use on-memory storage' do
assert_raise Fluent::ConfigError.new("Plugin @id or path for <storage> required when 'persistent' is true") do
@d.configure(config_element('ROOT', '', {}, [config_element('storage', '', {'persistent' => 'true'})]))
end
end
end

sub_test_case 'with various configurations' do
test 'mode and dir_mode controls permissions of stored data' do
storage_path = File.join(TMP_DIR, 'dir', 'my_store.json')
storage_conf = {
'path' => storage_path,
'mode' => '0600',
'dir_mode' => '0777',
}
conf = config_element('ROOT', '', {}, [config_element('storage', '', storage_conf)])
@d.configure(conf)
@d.start
@p = @d.storage_create()

assert_equal storage_path, @p.path
assert @p.store.empty?

@p.put('key1', '1')
assert_equal '1', @p.get('key1')

@p.save # stores all data into file

assert File.exist?(storage_path)
assert_equal 0600, (File.stat(storage_path).mode % 01000)
assert_equal 0777, (File.stat(File.dirname(storage_path)).mode % 01000)
end

test 'pretty_print controls to write data in files as human-easy-to-read' do
storage_path = File.join(TMP_DIR, 'dir', 'my_store.json')
storage_conf = {
'path' => storage_path,
'pretty_print' => 'true',
}
conf = config_element('ROOT', '', {}, [config_element('storage', '', storage_conf)])
@d.configure(conf)
@d.start
@p = @d.storage_create()

assert_equal storage_path, @p.path
assert @p.store.empty?

@p.put('key1', '1')
assert_equal '1', @p.get('key1')

@p.save # stores all data into file

expected_pp_json = <<JSON.chomp
{
"key1": "1"
}
JSON
assert_equal expected_pp_json, File.read(storage_path)
end
end
end

0 comments on commit b32781d

Please sign in to comment.