diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 9cbe88385f..ff25e99780 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -37,6 +37,8 @@ class Output < Base CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@a-zA-Z0-9]+\}/ + CHUNKING_FIELD_WARN_NUM = 4 + config_param :time_as_integer, :bool, default: false # `` and `` sections are available only when '#format' and '#write' are implemented @@ -253,6 +255,10 @@ def configure(conf) @output_time_formatter_cache = {} end + if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM + log.warn "many chunk keys specified, and it may cause too many chunks on your system." + end + # no chunk keys or only tags (chunking can be done without iterating event stream) @simple_chunking = !@chunk_key_time && @chunk_keys.empty? diff --git a/lib/fluent/test.rb b/lib/fluent/test.rb index 01f6ecbdd5..b60e771642 100644 --- a/lib/fluent/test.rb +++ b/lib/fluent/test.rb @@ -16,6 +16,7 @@ require 'test/unit' require 'fluent/env' # for Fluent.windows? +require 'fluent/test/log' require 'fluent/test/base' require 'fluent/test/input_test' require 'fluent/test/output_test' @@ -28,4 +29,23 @@ dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO logdev = Fluent::Test::DummyLogDevice.new logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) -$log ||= Fluent::Log.new(logger) \ No newline at end of file +$log ||= Fluent::Log.new(logger) + +module Fluent + module Test + def self.setup + Fluent.__send__(:remove_const, :Engine) + engine = Fluent.const_set(:Engine, EngineClass.new).init(SystemConfig.new) + + engine.define_singleton_method(:now=) {|n| + @now = n + } + engine.define_singleton_method(:now) { + @now ||= super() + } + + nil + end + end +end + diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index 57305538c9..ac17cf4561 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -14,27 +14,14 @@ # limitations under the License. # +require 'fluent/config' require 'fluent/engine' require 'fluent/system_config' -require 'fluent/config' +require 'fluent/test/log' require 'serverengine' module Fluent module Test - def self.setup - Fluent.__send__(:remove_const, :Engine) - engine = Fluent.const_set(:Engine, EngineClass.new).init(SystemConfig.new) - - engine.define_singleton_method(:now=) {|n| - @now = n - } - engine.define_singleton_method(:now) { - @now ||= super() - } - - nil - end - class TestDriver include ::Test::Unit::Assertions @@ -84,57 +71,6 @@ def run(num_waits = 10, &block) end end end - - class DummyLogDevice - attr_reader :logs - - def initialize - @logs = [] - end - - def reset - @logs = [] - end - - def tty? - false - end - - def puts(*args) - args.each{ |arg| write(arg + "\n") } - end - - def write(message) - @logs.push message - end - - def flush - true - end - - def close - true - end - end - - class TestLogger < Fluent::PluginLogger - def initialize - @logdev = DummyLogDevice.new - dl_opts = {} - dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO - logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts) - log = Fluent::Log.new(logger) - super(log) - end - - def reset - @logdev.reset - end - - def logs - @logdev.logs - end - end end end diff --git a/lib/fluent/test/log.rb b/lib/fluent/test/log.rb new file mode 100644 index 0000000000..59526f4bcb --- /dev/null +++ b/lib/fluent/test/log.rb @@ -0,0 +1,73 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'serverengine' +require 'fluent/log' + +module Fluent + module Test + class DummyLogDevice + attr_reader :logs + + def initialize + @logs = [] + end + + def reset + @logs = [] + end + + def tty? + false + end + + def puts(*args) + args.each{ |arg| write(arg + "\n") } + end + + def write(message) + @logs.push message + end + + def flush + true + end + + def close + true + end + end + + class TestLogger < Fluent::PluginLogger + def initialize + @logdev = DummyLogDevice.new + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO + logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts) + log = Fluent::Log.new(logger) + super(log) + end + + def reset + @logdev.reset + end + + def logs + @logdev.logs + end + end + end +end diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index ba191f7917..776e4747f5 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -125,6 +125,57 @@ def waiting(seconds) Timecop.return end + sub_test_case 'buffered output configured with many chunk keys' do + setup do + @stored_global_logger = $log + $log = Fluent::Test::TestLogger.new + @hash = { + 'flush_mode' => 'interval', + 'flush_thread_burst_interval' => 0.01, + 'chunk_limit_size' => 1024, + 'timekey' => 60, + } + @i = create_output(:buffered) + end + teardown do + $log = @stored_global_logger + end + test 'nothing are warned with less chunk keys' do + chunk_keys = 'time,key1,key2,key3' + @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)])) + logs = @i.log.out.logs.dup + @i.start + assert{ logs.select{|log| log.include?('[warn]') }.size == 0 } + end + + test 'a warning reported with 4 chunk keys' do + chunk_keys = 'key1,key2,key3,key4' + @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)])) + logs = @i.log.out.logs.dup + + @i.start # this calls `log.reset`... capturing logs about configure must be done before this line + assert_equal ['key1', 'key2', 'key3', 'key4'], @i.chunk_keys + + assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 } + end + + test 'a warning reported with 4 chunk keys including "tag"' do + chunk_keys = 'tag,key1,key2,key3' + @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)])) + logs = @i.log.out.logs.dup + @i.start # this calls `log.reset`... capturing logs about configure must be done before this line + assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 } + end + + test 'time key is not included for warned chunk keys' do + chunk_keys = 'time,key1,key2,key3' + @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)])) + logs = @i.log.out.logs.dup + @i.start + assert{ logs.select{|log| log.include?('[warn]') }.size == 0 } + end + end + sub_test_case 'buffered output feature without any buffer key, flush_mode: lazy' do setup do hash = {