Skip to content

Commit

Permalink
Merge pull request #3390 from ashie/ensure-eof-on-throttling
Browse files Browse the repository at this point in the history
in_tail: Ensure to reach EOF after rotate even if throttling is enabled
  • Loading branch information
kenhys authored Jul 12, 2021
2 parents c4593ab + 9904728 commit b955a2c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 1 deletion.
33 changes: 32 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,25 @@ def detach_watcher(tw, ino, close_io = true)
def detach_watcher_after_rotate_wait(tw, ino)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
if @open_on_every_update
# Detach now because it's already closed, waiting it doesn't make sense.
detach_watcher(tw, ino)
elsif @read_bytes_limit_per_second < 0
# throttling isn't enabled, just wait @rotate_wait
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw, ino)
end
else
# When the throttling feature is enabled, it might not reach EOF yet.
# Should ensure to read all contents before closing it, with keeping throttling.
start_time_to_wait = Fluent::Clock.now
timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do
elapsed = Fluent::Clock.now - start_time_to_wait
if tw.eof? && elapsed >= @rotate_wait
timer.detach
detach_watcher(tw, ino)
end
end
end
end

Expand Down Expand Up @@ -736,6 +753,10 @@ def close
end
end

def eof?
@io_handler.eof?
end

def on_notify
begin
stat = Fluent::FileWrapper.stat(@path)
Expand Down Expand Up @@ -923,6 +944,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@shutdown_start_time = nil
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new
@eof = false

@log.info "following tail of #{@path}"
end
Expand All @@ -949,6 +971,10 @@ def opened?
!!@io
end

def eof?
@eof
end

private

def limit_bytes_per_second_reached?
Expand Down Expand Up @@ -989,6 +1015,7 @@ def handle_notify
while true
@start_reading_time ||= Fluent::Clock.now
data = io.readpartial(BYTES_TO_READ, @iobuf)
@eof = false
@number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)
Expand All @@ -1005,6 +1032,7 @@ def handle_notify
end
end
rescue EOFError
@eof = true
end
end

Expand Down Expand Up @@ -1042,14 +1070,17 @@ def with_io
else
@io ||= open
yield @io
@eof = true if @io.nil?
end
rescue WatcherSetupError => e
close
@eof = true
raise e
rescue
@log.error $!.to_s
@log.error_backtrace
close
@eof = true
end
end

Expand Down
85 changes: 85 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,91 @@ def test_emit_with_read_bytes_limit_per_second(data)
assert_equal([], d.events)
end
end

sub_test_case "EOF with reads_bytes_per_second" do
def test_longer_than_rotate_wait
limit_bytes = 8192
num_lines = 1024 * 3
msg = "08bytes"

File.open("#{TMP_DIR}/tail.txt", "wb") do |f|
f.write("#{msg}\n" * num_lines)
end

config = CONFIG_READ_FROM_HEAD +
SINGLE_LINE_CONFIG +
config_element("", "", {
"read_bytes_limit_per_second" => limit_bytes,
"rotate_wait" => 0.1,
"refresh_interval" => 0.5,
})

rotated = false
d = create_driver(config)
d.run(timeout: 10) do
while d.events.size < num_lines do
if d.events.size > 0 && !rotated
cleanup_file("#{TMP_DIR}/tail.txt")
FileUtils.touch("#{TMP_DIR}/tail.txt")
rotated = true
end
sleep 0.3
end
end

assert_equal(num_lines,
d.events.count do |event|
event[2]["message"] == msg
end)
end

def test_shorter_than_rotate_wait
limit_bytes = 8192
num_lines = 1024 * 2
msg = "08bytes"

File.open("#{TMP_DIR}/tail.txt", "wb") do |f|
f.write("#{msg}\n" * num_lines)
end

config = CONFIG_READ_FROM_HEAD +
SINGLE_LINE_CONFIG +
config_element("", "", {
"read_bytes_limit_per_second" => limit_bytes,
"rotate_wait" => 2,
"refresh_interval" => 0.5,
})

start_time = Fluent::Clock.now
rotated = false
detached = false
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.twice

d.run(timeout: 10) do
until detached do
if d.events.size > 0 && !rotated
cleanup_file("#{TMP_DIR}/tail.txt")
FileUtils.touch("#{TMP_DIR}/tail.txt")
rotated = true
end
sleep 0.3
end
end

assert_true(Fluent::Clock.now - start_time > 2)
assert_equal(num_lines,
d.events.count do |event|
event[2]["message"] == msg
end)
end
end
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
Expand Down

0 comments on commit b955a2c

Please sign in to comment.