Skip to content

Commit

Permalink
Merge pull request #2173 from fluent/change-queued_chunks_limit_size-…
Browse files Browse the repository at this point in the history
…default-behaviour

Use flush_thread_count value for queued_chunks_limit_size when queued_chunks_limit_size is not specified
  • Loading branch information
repeatedly authored Nov 9, 2018
2 parents f287c60 + ec05947 commit a8d2bbb
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end

def queue_full?
@queued_chunks_limit_size && (synchronize { @queue.size } >= @queued_chunks_limit_size)
synchronize { @queue.size } >= @queued_chunks_limit_size
end

def queued_records
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ def configure(conf)
log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'"
end
end

if @buffer.queued_chunks_limit_size.nil?
@buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count
end
end

if @secondary_config
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def create_chunk_es(metadata, es)

sub_test_case 'with default configuration and dummy implementation' do
setup do
@p = create_buffer({})
@p = create_buffer({'queued_chunks_limit_size' => 100})
@dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil)
@dm1 = create_metadata(Time.parse('2016-04-11 16:10:00 +0000').to_i, nil, nil)
@dm2 = create_metadata(Time.parse('2016-04-11 16:20:00 +0000').to_i, nil, nil)
Expand Down
19 changes: 19 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ def waiting(seconds)
Timecop.return
end

test 'queued_chunks_limit_size is same as flush_thread_count by default' do
hash = {'flush_thread_count' => 4}
i = create_output
i.register(:prefer_buffered_processing) { true }
i.configure(config_element('ROOT', '', {}, [config_element('buffer','tag',hash)]))

assert_equal 4, i.buffer.queued_chunks_limit_size
end

test 'prefer queued_chunks_limit_size parameter than flush_thread_count' do
hash = {'flush_thread_count' => 4, 'queued_chunks_limit_size' => 2}
i = create_output
i.register(:prefer_buffered_processing) { true }
i.configure(config_element('ROOT', '', {}, [config_element('buffer','tag',hash)]))

assert_equal 2, i.buffer.queued_chunks_limit_size
end

sub_test_case 'chunk feature in #write for output plugins' do
setup do
@stored_global_logger = $log
Expand Down Expand Up @@ -1064,6 +1082,7 @@ def waiting(seconds)
'flush_thread_count' => 1,
'flush_thread_burst_interval' => 0.1,
'chunk_limit_size' => 1024,
'queued_chunks_limit_size' => 100
}
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
Expand Down
8 changes: 8 additions & 0 deletions test/plugin/test_output_as_buffered_retries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def get_log_time(msg, logs)
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -252,6 +253,7 @@ def get_log_time(msg, logs)
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'retry_timeout' => 3600,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -342,6 +344,7 @@ def get_log_time(msg, logs)
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -485,6 +488,7 @@ def get_log_time(msg, logs)
'retry_type' => :periodic,
'retry_wait' => 3,
'retry_randomize' => false,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -531,6 +535,7 @@ def get_log_time(msg, logs)
'retry_wait' => 30,
'retry_randomize' => false,
'retry_timeout' => 120,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -626,6 +631,7 @@ def get_log_time(msg, logs)
'retry_wait' => 3,
'retry_randomize' => false,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -724,6 +730,7 @@ def get_log_time(msg, logs)
'retry_randomize' => false,
'retry_timeout' => 3600,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -795,6 +802,7 @@ def get_log_time(msg, logs)
'retry_wait' => 30,
'retry_timeout' => 360,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_output_as_buffered_secondary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def dummy_event_stream
test 'secondary plugin can do delayed commit even if primary does not do it, and non-committed chunks will be rollbacked by primary' do
written = []
chunks = []
priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false})
priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false, 'queued_chunks_limit_size' => 10})
secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
@i.configure(config_element('ROOT','',{},[priconf,secconf]))
@i.register(:prefer_buffered_processing){ true }
Expand Down

0 comments on commit a8d2bbb

Please sign in to comment.