Skip to content

Commit 478b487

Browse files
committed
Remove needless thread poll implementation
If we don't use `#sleep` to prevent reading bytes beyond limits, it is not needed to implement thread pool mechanism. We just need to get out reading file loop when bytes limit is reached. Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent d3a3023 commit 478b487

File tree

3 files changed

+9
-133
lines changed

3 files changed

+9
-133
lines changed

lib/fluent/plugin/in_tail.rb

+3-79
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,6 @@ def initialize
109109
config_param :path_timezone, :string, default: nil
110110
desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files'
111111
config_param :follow_inodes, :bool, default: false
112-
desc 'Specify max size of thread pool'
113-
config_param :max_thread_pool_size, :integer, default: 1
114112

115113
config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do
116114
config_argument :usage, :string, default: 'in_tail_parser'
@@ -182,24 +180,14 @@ def configure(conf)
182180
# parser is already created by parser helper
183181
@parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage)
184182
@capability = Fluent::Capability.new(:current_process)
185-
# Need to create thread pool because #sleep should pause entire thread on enabling log throttling feature.
186-
@thread_pool = nil
187-
@enable_read_bytes_limit_thread_pool = false
188183
if @read_bytes_limit_per_second > 0
189-
if @max_thread_pool_size < 0
190-
raise Fluent::ConfigError, "Specify positive number"
191-
end
192184
if !@enable_watch_timer
193185
raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature"
194186
end
195187
if @read_bytes_limit_per_second < 8192
196188
log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second"
197189
@read_bytes_limit_per_second = 8192
198190
end
199-
if @max_thread_pool_size < 2
200-
raise Fluent::ConfigError, "Specify 2 or more on max_thread_pool_size"
201-
end
202-
@enable_read_bytes_limit_thread_pool = true
203191
end
204192
end
205193

@@ -447,19 +435,9 @@ def construct_watcher(target_info)
447435
end
448436

449437
def start_watchers(targets_info)
450-
if @enable_read_bytes_limit_thread_pool
451-
@thread_pool = TailThread::Pool.new(@max_thread_pool_size) do |pool|
452-
targets_info.each_value {|target_info|
453-
pool.run {
454-
construct_watcher(target_info)
455-
}
456-
}
457-
end
458-
else
459-
targets_info.each_value {|target_info|
460-
construct_watcher(target_info)
461-
}
462-
end
438+
targets_info.each_value {|target_info|
439+
construct_watcher(target_info)
440+
}
463441
end
464442

465443
def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
@@ -478,10 +456,6 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
478456
end
479457
end
480458
}
481-
if @thread_pool
482-
@thread_pool.stop
483-
@thread_pool = nil
484-
end
485459
end
486460

487461
def close_watcher_handles
@@ -712,56 +686,6 @@ def on_timer
712686
end
713687
end
714688

715-
class TailThread
716-
class Pool
717-
def initialize(max_size, &session)
718-
@max_size = max_size
719-
@queue = Queue.new
720-
@threads = []
721-
session.call(self)
722-
ensure
723-
terminate
724-
end
725-
726-
def run(&task)
727-
@queue.push(task)
728-
@threads << create_thread if @threads.size < @max_size
729-
end
730-
731-
def terminate
732-
until @queue.num_waiting == @threads.size
733-
sleep 0.01
734-
end
735-
@queue.close
736-
begin
737-
Timeout.timeout(1) do
738-
@threads.each{|th| th.join }
739-
end
740-
rescue Timeout::Error
741-
@threads.each {|th| th.kill }
742-
end
743-
end
744-
745-
def stop
746-
return unless running?
747-
748-
terminate
749-
end
750-
751-
protected def create_thread
752-
Thread.start(@queue) do |q|
753-
while task = q.pop
754-
task.call
755-
end
756-
end
757-
end
758-
759-
protected def running?
760-
!@queue.closed?
761-
end
762-
end
763-
end
764-
765689
class TailWatcher
766690
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
767691
@path = target_info.path

test/plugin/in_tail/test_tailthread_pool.rb

-26
This file was deleted.

test/plugin/test_in_tail.rb

+6-28
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
require 'timecop'
99
require 'tmpdir'
1010
require 'securerandom'
11-
require 'etc'
1211

1312
class TailInputTest < Test::Unit::TestCase
1413
include FlexMock::TestCase
@@ -103,7 +102,6 @@ def create_target_info(path)
103102
CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
104103
CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false })
105104
CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true })
106-
CONFIG_MAX_THREAD_POOL_SIZE = config_element("", "", { "max_thread_pool_size" => (Etc.nprocessors / 1.5).ceil })
107105
COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", {
108106
"path" => "#{TMP_DIR}/tail.txt*",
109107
"pos_file" => "#{TMP_DIR}/tail.pos",
@@ -159,7 +157,6 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
159157
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
160158
assert_equal 1000, d.instance.read_lines_limit
161159
assert_equal -1, d.instance.read_bytes_limit_per_second
162-
assert_equal 1, d.instance.max_thread_pool_size
163160
assert_equal false, d.instance.ignore_repeated_permission_error
164161
assert_nothing_raised do
165162
d.instance.have_read_capability?
@@ -201,21 +198,14 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
201198

202199
sub_test_case "log throttling per file" do
203200
test "w/o watcher timer is invalid" do
204-
conf = CONFIG_ENABLE_WATCH_TIMER + CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
205-
assert_raise(Fluent::ConfigError) do
206-
create_driver(conf)
207-
end
208-
end
209-
210-
test "w/o 2 or more thread pool size is invalid" do
211-
conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
201+
conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
212202
assert_raise(Fluent::ConfigError) do
213203
create_driver(conf)
214204
end
215205
end
216206

217207
test "valid" do
218-
conf = CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
208+
conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
219209
assert_raise(Fluent::ConfigError) do
220210
create_driver(conf)
221211
end
@@ -355,14 +345,6 @@ def test_emit_with_read_lines_limit(data)
355345
cleanup_file("#{TMP_DIR}/tail.txt")
356346
end
357347

358-
def count_thread_pool_object
359-
num = 0
360-
ObjectSpace.each_object(Fluent::Plugin::TailInput::TailThread::Pool) { |obj|
361-
num += 1
362-
}
363-
num
364-
end
365-
366348
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
367349
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
368350
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
@@ -383,13 +365,13 @@ def test_emit_with_read_bytes_limit_per_second(data)
383365
config_style, limit, limit_bytes, num_events = data
384366
case config_style
385367
when :flat
386-
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE
368+
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
387369
when :parse
388-
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE
370+
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
389371
when :flat_without_stat
390-
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE
372+
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
391373
when :parse_without_stat
392-
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE
374+
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
393375
end
394376
d = create_driver(config)
395377
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
@@ -403,10 +385,6 @@ def test_emit_with_read_bytes_limit_per_second(data)
403385
}
404386
end
405387

406-
assert do
407-
count_thread_pool_object >= 1
408-
end
409-
410388
events = d.events
411389
assert_true(events.length <= num_events)
412390
assert_equal({"message" => msg}, events[0][2])

0 commit comments

Comments
 (0)