diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 58154a9d93..c39c17c109 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -372,7 +372,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) end def queue_full? - @queued_chunks_limit_size && (synchronize { @queue.size } >= @queued_chunks_limit_size) + synchronize { @queue.size } >= @queued_chunks_limit_size end def queued_records diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 85a5593ebf..66077254c9 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -354,6 +354,10 @@ def configure(conf) log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'" end end + + if @buffer.queued_chunks_limit_size.nil? + @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count + end end if @secondary_config diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 728bc9b84d..88e4b631be 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -170,7 +170,7 @@ def create_chunk_es(metadata, es) sub_test_case 'with default configuration and dummy implementation' do setup do - @p = create_buffer({}) + @p = create_buffer({'queued_chunks_limit_size' => 100}) @dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil) @dm1 = create_metadata(Time.parse('2016-04-11 16:10:00 +0000').to_i, nil, nil) @dm2 = create_metadata(Time.parse('2016-04-11 16:20:00 +0000').to_i, nil, nil) diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 2f94d35a13..ca3552b4d2 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -219,6 +219,24 @@ def waiting(seconds) Timecop.return end + test 'queued_chunks_limit_size is same as flush_thread_count by default' do + hash = {'flush_thread_count' => 4} + i = create_output + i.register(:prefer_buffered_processing) { true } + i.configure(config_element('ROOT', '', {}, [config_element('buffer','tag',hash)])) + + assert_equal 4, i.buffer.queued_chunks_limit_size + end + + test 'prefer queued_chunks_limit_size parameter than flush_thread_count' do + hash = {'flush_thread_count' => 4, 'queued_chunks_limit_size' => 2} + i = create_output + i.register(:prefer_buffered_processing) { true } + i.configure(config_element('ROOT', '', {}, [config_element('buffer','tag',hash)])) + + assert_equal 2, i.buffer.queued_chunks_limit_size + end + sub_test_case 'chunk feature in #write for output plugins' do setup do @stored_global_logger = $log @@ -1064,6 +1082,7 @@ def waiting(seconds) 'flush_thread_count' => 1, 'flush_thread_burst_interval' => 0.1, 'chunk_limit_size' => 1024, + 'queued_chunks_limit_size' => 100 } @i = create_output(:buffered) @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index e7d74bdc49..7d76cd3bba 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -123,6 +123,7 @@ def get_log_time(msg, logs) 'flush_interval' => 1, 'flush_thread_burst_interval' => 0.1, 'retry_randomize' => false, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -252,6 +253,7 @@ def get_log_time(msg, logs) 'flush_thread_burst_interval' => 0.1, 'retry_randomize' => false, 'retry_timeout' => 3600, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -342,6 +344,7 @@ def get_log_time(msg, logs) 'flush_thread_burst_interval' => 0.1, 'retry_randomize' => false, 'retry_max_times' => 10, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -485,6 +488,7 @@ def get_log_time(msg, logs) 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_randomize' => false, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -531,6 +535,7 @@ def get_log_time(msg, logs) 'retry_wait' => 30, 'retry_randomize' => false, 'retry_timeout' => 120, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -626,6 +631,7 @@ def get_log_time(msg, logs) 'retry_wait' => 3, 'retry_randomize' => false, 'retry_max_times' => 10, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -724,6 +730,7 @@ def get_log_time(msg, logs) 'retry_randomize' => false, 'retry_timeout' => 3600, 'retry_max_times' => 10, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } @@ -795,6 +802,7 @@ def get_log_time(msg, logs) 'retry_wait' => 30, 'retry_timeout' => 360, 'retry_max_times' => 10, + 'queued_chunks_limit_size' => 100 } @i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)])) @i.register(:prefer_buffered_processing){ true } diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index a98f0c68bf..d1b398f7b6 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -544,7 +544,7 @@ def dummy_event_stream test 'secondary plugin can do delayed commit even if primary does not do it, and non-committed chunks will be rollbacked by primary' do written = [] chunks = [] - priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false}) + priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false, 'queued_chunks_limit_size' => 10}) secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'}) @i.configure(config_element('ROOT','',{},[priconf,secconf])) @i.register(:prefer_buffered_processing){ true }