Skip to content

Commit

Permalink
Merge pull request #1775 from fluent/in_tail-add-enable-stat-watcher-…
Browse files Browse the repository at this point in the history
…option

in_tail: Add enable_stat_watcher option to disable inotify events
  • Loading branch information
repeatedly authored Dec 6, 2017
2 parents 74e2b13 + 2e47989 commit e8a1127
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
19 changes: 13 additions & 6 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def initialize
config_param :emit_unmatched_lines, :bool, default: false
desc 'Enable the additional watch timer.'
config_param :enable_watch_timer, :bool, default: true
desc 'Enable the stat watcher based on inotify.'
config_param :enable_stat_watcher, :bool, default: true
desc 'The encoding after conversion of the input.'
config_param :encoding, :string, default: nil
desc 'The encoding of the input.'
Expand Down Expand Up @@ -115,6 +117,10 @@ def configure(conf)

super

if !@enable_watch_timer && !@enable_stat_watcher
raise Fluent::ConfigError, "either of enable_watch_timer or enable_stat_watcher must be true"
end

@paths = @path.split(',').map {|path| path.strip }
if @paths.empty?
raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input"
Expand Down Expand Up @@ -254,10 +260,10 @@ def refresh_watchers

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger)
event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher
end
tw
rescue => e
Expand Down Expand Up @@ -457,17 +463,18 @@ def parse_multilines(lines, tail_watcher)
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@enable_watch_timer = enable_watch_timer
@enable_stat_watcher = enable_stat_watcher
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@update_watcher = update_watcher

@stat_trigger = StatWatcher.new(self, &method(:on_notify))
@stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil
@timer_trigger = nil

@rotate_handler = RotateHandler.new(self, &method(:on_rotate))
Expand All @@ -483,7 +490,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r
attr_reader :path
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher
attr_accessor :timer_trigger
attr_accessor :line_buffer, :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
Expand All @@ -503,7 +510,7 @@ def attach

def detach
@timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached?
@stat_trigger.detach if @stat_trigger && @stat_trigger.attached?
@stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached?
@io_handler.on_notify if @io_handler
end

Expand Down
35 changes: 33 additions & 2 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def cleanup_directory(path)
COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" })
CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true })
CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false })
CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true })
SINGLE_LINE_CONFIG = config_element("", "", { "format" => "/(?<message>.*)/" })
PARSE_SINGLE_LINE_CONFIG = config_element("", "", {}, [config_element("parse", "", { "@type" => "/(?<message>.*)/" })])
Expand Down Expand Up @@ -83,6 +84,12 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
end
end

test "both enable_watch_timer and enable_stat_watcher are false" do
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
end
end

sub_test_case "encoding" do
test "valid" do
conf = SINGLE_LINE_CONFIG + config_element("", "", { "encoding" => "utf-8" })
Expand Down Expand Up @@ -256,6 +263,30 @@ def test_emit_with_enable_watch_timer(data)
assert_equal({"message" => "test3"}, events[0][2])
assert_equal({"message" => "test4"}, events[1][2])
end

data(flat: CONFIG_DISABLE_STAT_WATCHER + SINGLE_LINE_CONFIG,
parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_disable_stat_watcher(data)
config = data
File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}

d = create_driver(config)

d.run(expect_emits: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
f.puts "test3"
f.puts "test4"
}
end

events = d.events
assert(events.length > 0)
assert_equal({"message" => "test3"}, events[0][2])
assert_equal({"message" => "test4"}, events[1][2])
end
end

class TestWithSystem < self
Expand Down Expand Up @@ -926,7 +957,7 @@ def test_z_refresh_watchers
Timecop.freeze(2010, 1, 2, 3, 4, 5) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
EX_PATHS.each do |path|
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') { |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand All @@ -944,7 +975,7 @@ def test_z_refresh_watchers

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') do |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand Down

0 comments on commit e8a1127

Please sign in to comment.