From 06d176bd00f345d227a7f34d0aa86bd874cf3dc7 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 13 Jun 2014 05:58:40 +0900 Subject: [PATCH] Support read_from_head without pos_file option Including InputTestDriver fix and adding more tests from pull #247 --- lib/fluent/plugin/in_tail.rb | 7 +- lib/fluent/test/input_test.rb | 7 +- test/plugin/test_in_tail.rb | 116 ++++++++++++++++++++++++++++++---- 3 files changed, 109 insertions(+), 21 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 6fe5451286..23f5967178 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -129,7 +129,7 @@ def refresh_watchers end def setup_watcher(path, pe) - tw = TailWatcher.new(path, @rotate_wait, pe, log, method(:update_watcher), &method(:receive_lines)) + tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, method(:update_watcher), &method(:receive_lines)) tw.attach(@loop) tw end @@ -275,10 +275,11 @@ def parse_multilines(lines, tail_watcher) end class TailWatcher - def initialize(path, rotate_wait, pe, log, update_watcher, &receive_lines) + def initialize(path, rotate_wait, pe, log, read_from_head, update_watcher, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new + @read_from_head = read_from_head @receive_lines = receive_lines @update_watcher = update_watcher @@ -354,7 +355,7 @@ def on_rotate(io) # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. - pos = fsize + pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) diff --git a/lib/fluent/test/input_test.rb b/lib/fluent/test/input_test.rb index b298b7bc63..3fb858eb37 100644 --- a/lib/fluent/test/input_test.rb +++ b/lib/fluent/test/input_test.rb @@ -72,11 +72,10 @@ def records def run(&block) m = method(:emit_stream) + Engine.define_singleton_method(:emit_stream) {|tag,es| + m.call(tag, es) + } super { - Engine.define_singleton_method(:emit_stream) {|tag,es| - m.call(tag, es) - } - block.call if block if @expects diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 2ae7ca9e1f..20c262a80c 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -13,12 +13,17 @@ def setup TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" - COMMON_CONFIG = %[ + CONFIG = %[ path #{TMP_DIR}/tail.txt tag t1 rotate_wait 2s + ] + COMMON_CONFIG = CONFIG + %[ pos_file #{TMP_DIR}/tail.pos ] + CONFIG_READ_FROM_HEAD = %[ + read_from_head true + ] SINGLE_LINE_CONFIG = %[ format /(?.*)/ ] @@ -36,6 +41,8 @@ def test_configure assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file end + # TODO: Should using more better approach instead of sleep wait + def test_emit File.open("#{TMP_DIR}/tail.txt", "w") {|f| f.puts "test1" @@ -56,8 +63,89 @@ def test_emit emits = d.emits assert_equal(true, emits.length > 0) - assert_equal({"message"=>"test3"}, emits[0][2]) - assert_equal({"message"=>"test4"}, emits[1][2]) + assert_equal({"message" => "test3"}, emits[0][2]) + assert_equal({"message" => "test4"}, emits[1][2]) + end + + def test_emit_with_read_from_head + File.open("#{TMP_DIR}/tail.txt", "w") {|f| + f.puts "test1" + f.puts "test2" + } + + d = create_driver(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG) + + d.run do + sleep 1 + + File.open("#{TMP_DIR}/tail.txt", "a") {|f| + f.puts "test3" + f.puts "test4" + } + sleep 1 + end + + emits = d.emits + assert(emits.length > 0) + assert_equal({"message" => "test1"}, emits[0][2]) + assert_equal({"message" => "test2"}, emits[1][2]) + assert_equal({"message" => "test3"}, emits[2][2]) + assert_equal({"message" => "test4"}, emits[3][2]) + end + + def test_rotate_file + emits = sub_test_rotate_file(SINGLE_LINE_CONFIG) + assert(emits.length > 0) + assert_equal({"message" => "test3"}, emits[0][2]) + assert_equal({"message" => "test4"}, emits[1][2]) + assert_equal({"message" => "test5"}, emits[2][2]) + assert_equal({"message" => "test6"}, emits[3][2]) + end + + def test_rotate_file_with_read_from_head + emits = sub_test_rotate_file(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG) + assert(emits.length > 0) + assert_equal({"message" => "test1"}, emits[0][2]) + assert_equal({"message" => "test2"}, emits[1][2]) + assert_equal({"message" => "test3"}, emits[2][2]) + assert_equal({"message" => "test4"}, emits[3][2]) + assert_equal({"message" => "test5"}, emits[4][2]) + assert_equal({"message" => "test6"}, emits[5][2]) + end + + def sub_test_rotate_file(config = nil) + File.open("#{TMP_DIR}/tail.txt", "w") {|f| + f.puts "test1" + f.puts "test2" + } + d = create_driver(config) + d.run do + sleep 1 + + File.open("#{TMP_DIR}/tail.txt", "a") {|f| + f.puts "test3" + f.puts "test4" + } + sleep 1 + + FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt") + sleep 1 + + File.open("#{TMP_DIR}/tail.txt", "w") {|f| } + sleep 1 + + File.open("#{TMP_DIR}/tail.txt", "a") {|f| + f.puts "test5" + f.puts "test6" + } + sleep 1 + end + + d.run do + sleep 1 + end + + d.emits end def test_lf @@ -79,7 +167,7 @@ def test_lf emits = d.emits assert_equal(true, emits.length > 0) - assert_equal({"message"=>"test3test4"}, emits[0][2]) + assert_equal({"message" => "test3test4"}, emits[0][2]) end def test_whitespace @@ -103,12 +191,12 @@ def test_whitespace emits = d.emits assert_equal(true, emits.length > 0) - assert_equal({"message"=>" "}, emits[0][2]) - assert_equal({"message"=>" 4 spaces"}, emits[1][2]) - assert_equal({"message"=>"4 spaces "}, emits[2][2]) - assert_equal({"message"=>" "}, emits[3][2]) - assert_equal({"message"=>" tab"}, emits[4][2]) - assert_equal({"message"=>"tab "}, emits[5][2]) + assert_equal({"message" => " "}, emits[0][2]) + assert_equal({"message" => " 4 spaces"}, emits[1][2]) + assert_equal({"message" => "4 spaces "}, emits[2][2]) + assert_equal({"message" => " "}, emits[3][2]) + assert_equal({"message" => " tab"}, emits[4][2]) + assert_equal({"message" => "tab "}, emits[5][2]) end # multiline mode test @@ -247,7 +335,7 @@ def test_refresh_watchers flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass| EX_PATHS.each do |path| - watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, any, any).once.and_return do + watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, true, any, any).once.and_return do flexmock('TailWatcher') { |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times @@ -263,7 +351,7 @@ def test_refresh_watchers end flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass| - watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, any, any).once.and_return do + watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, true, any, any).once.and_return do flexmock('TailWatcher') do |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times @@ -368,8 +456,8 @@ def test_missing_file end emits = d.emits assert_equal(2, emits.length) - assert_equal({"message"=>"test3"}, emits[0][2]) - assert_equal({"message"=>"test4"}, emits[1][2]) + assert_equal({"message" => "test3"}, emits[0][2]) + assert_equal({"message" => "test4"}, emits[1][2]) end end end