From 2c461a1f14cc19e6fc1371482ff071c392cf826e Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Mon, 16 May 2016 14:06:33 +0900 Subject: [PATCH] Merge pull request #951 from fluent/tail_path Add tail_path option to in_tail plugin --- lib/fluent/plugin/in_tail.rb | 12 ++-- test/plugin/test_in_tail.rb | 119 ++++++++++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 2809acf552..78673f1803 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -58,6 +58,8 @@ def initialize raise ConfigError, e.message end end + desc 'Add the log path being tailed to records. Specify the field name to be used.' + config_param :path_key, :string, default: nil attr_reader :paths @@ -238,6 +240,7 @@ def flush_buffer(tw) else @tag end + record[@path_key] ||= tw.path unless @path_key.nil? router.emit(tag, time, record) else log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}" @@ -275,12 +278,13 @@ def receive_lines(lines, tail_watcher) return true end - def convert_line_to_event(line, es) + def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n line.force_encoding(@encoding) if @encoding @parser.parse(line) { |time, record| if time && record + record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else log.warn "pattern not match: #{line.inspect}" @@ -295,7 +299,7 @@ def convert_line_to_event(line, es) def parse_singleline(lines, tail_watcher) es = MultiEventStream.new lines.each { |line| - convert_line_to_event(line, es) + convert_line_to_event(line, es, tail_watcher) } es end @@ -308,7 +312,7 @@ def parse_multilines(lines, tail_watcher) lines.each { |line| if @parser.firstline?(line) if lb - convert_line_to_event(lb, es) + convert_line_to_event(lb, es, tail_watcher) end lb = line else @@ -325,7 +329,7 @@ def parse_multilines(lines, tail_watcher) lb << line @parser.parse(lb) { |time, record| if time && record - convert_line_to_event(lb, es) + convert_line_to_event(lb, es, tail_watcher) lb = '' end } diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 9a528aed77..5325f29d95 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1,7 +1,7 @@ require_relative '../helper' require 'fluent/test' require 'net/http' -require 'flexmock' +require 'flexmock/test_unit' class TailInputTest < Test::Unit::TestCase include FlexMock::TestCase @@ -739,4 +739,121 @@ def execute_test(error_class, error_message) d.emits end end + + sub_test_case "tail_path" do + def test_tail_path_with_singleline + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + f.puts "test1" + f.puts "test2" + } + + d = create_driver(%[path_key path] + SINGLE_LINE_CONFIG) + + d.run do + sleep 1 + + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + f.puts "test3" + f.puts "test4" + } + sleep 1 + end + + emits = d.emits + assert_equal(true, emits.length > 0) + emits.each do |emit| + assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) + end + end + + def test_tail_path_with_multiline_with_firstline + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + + d = create_driver %[ + path_key path + format multiline + format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ + format_firstline /^[s]/ + ] + d.run do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "f test1" + f.puts "s test2" + f.puts "f test3" + f.puts "f test4" + f.puts "s test5" + f.puts "s test6" + f.puts "f test7" + f.puts "s test8" + } + sleep 1 + end + + emits = d.emits + assert(emits.length == 4) + emits.each do |emit| + assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) + end + end + + def test_tail_path_with_multiline_without_firstline + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + + d = create_driver %[ + path_key path + format multiline + format1 /(?foo \\d)\\n/ + format2 /(?bar \\d)\\n/ + format3 /(?baz \\d)/ + ] + d.run do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "foo 1" + f.puts "bar 1" + f.puts "baz 1" + } + sleep 1 + end + + emits = d.emits + assert(emits.length > 0) + emits.each do |emit| + assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) + end + end + + def test_tail_path_with_multiline_with_multiple_paths + files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] + files.each { |file| File.open(file, "wb") { |f| } } + + d = create_driver(%[ + path #{files[0]},#{files[1]} + path_key path + tag t1 + format multiline + format1 /^[s|f] (?.*)/ + format_firstline /^[s]/ + ], false) + d.run do + files.each do |file| + File.open(file, 'ab') { |f| + f.puts "f #{file} line should be ignored" + f.puts "s test1" + f.puts "f test2" + f.puts "f test3" + f.puts "s test4" + } + end + sleep 1 + end + + emits = d.emits + assert(emits.length == 4) + assert_equal("#{TMP_DIR}/tail1.txt", emits[0][2]["path"]) + assert_equal("#{TMP_DIR}/tail2.txt", emits[1][2]["path"]) + # "test4" events are here because these events are flushed at shutdown phase + assert_equal("#{TMP_DIR}/tail1.txt", emits[2][2]["path"]) + assert_equal("#{TMP_DIR}/tail2.txt", emits[3][2]["path"]) + end + end end