Skip to content

Commit

Permalink
Merge pull request #3535 from Pranjal-Gupta2/feature/in_tail_throttle
Browse files Browse the repository at this point in the history
Add log throttling in files based on group rules
  • Loading branch information
ashie authored May 17, 2022
2 parents e9a58c8 + 5f54dd0 commit e55d409
Show file tree
Hide file tree
Showing 4 changed files with 543 additions and 13 deletions.
42 changes: 35 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
require 'fluent/variable_store'
require 'fluent/capability'
require 'fluent/plugin/in_tail/position_file'
require 'fluent/plugin/in_tail/group_watch'

if Fluent.windows?
require_relative 'file_wrapper'
Expand All @@ -33,6 +34,8 @@

module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
include GroupWatch

Fluent::Plugin.register_input('tail', self)

helpers :timer, :event_loop, :parser, :compat_parameters
Expand Down Expand Up @@ -406,6 +409,8 @@ def setup_watcher(target_info, pe)
event_loop_attach(watcher)
end

tw.group_watcher = add_path_to_group_watcher(target_info.path)

tw
rescue => e
if tw
Expand Down Expand Up @@ -461,6 +466,8 @@ def start_watchers(targets_info)

def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
targets_info.each_value { |target_info|
remove_path_from_group_watcher(target_info.path)

if remove_watcher
tw = @tails.delete(target_info)
else
Expand Down Expand Up @@ -542,18 +549,19 @@ def detach_watcher(tw, ino, close_io = true)
end
end

def throttling_is_enabled?(tw)
return true if @read_bytes_limit_per_second > 0
return true if tw.group_watcher && tw.group_watcher.limit >= 0
false
end

def detach_watcher_after_rotate_wait(tw, ino)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
if @open_on_every_update
# Detach now because it's already closed, waiting it doesn't make sense.
detach_watcher(tw, ino)
elsif @read_bytes_limit_per_second < 0
# throttling isn't enabled, just wait @rotate_wait
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw, ino)
end
else
elsif throttling_is_enabled?(tw)
# When the throttling feature is enabled, it might not reach EOF yet.
# Should ensure to read all contents before closing it, with keeping throttling.
start_time_to_wait = Fluent::Clock.now
Expand All @@ -564,6 +572,11 @@ def detach_watcher_after_rotate_wait(tw, ino)
detach_watcher(tw, ino)
end
end
else
# when the throttling feature isn't enabled, just wait @rotate_wait
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw, ino)
end
end
end

Expand Down Expand Up @@ -775,6 +788,7 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
attr_reader :watchers
attr_accessor :group_watcher

def tag
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
Expand Down Expand Up @@ -997,6 +1011,10 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@log.info "following tail of #{@path}"
end

def group_watcher
@watcher.group_watcher
end

def on_notify
@notify_mutex.synchronize { handle_notify }
end
Expand Down Expand Up @@ -1054,6 +1072,7 @@ def should_shutdown_now?

def handle_notify
return if limit_bytes_per_second_reached?
return if group_watcher&.limit_lines_reached?(@path)

with_io do |io|
begin
Expand All @@ -1063,17 +1082,26 @@ def handle_notify
begin
while true
@start_reading_time ||= Fluent::Clock.now
group_watcher&.update_reading_time(@path)

data = io.readpartial(BYTES_TO_READ, @iobuf)
@eof = false
@number_bytes_read += data.bytesize
@fifo << data

n_lines_before_read = @lines.size
@fifo.read_lines(@lines)
group_watcher&.update_lines_read(@path, @lines.size - n_lines_before_read)

if limit_bytes_per_second_reached? || should_shutdown_now?
group_watcher_limit = group_watcher&.limit_lines_reached?(@path)
@log.debug "Reading Limit exceeded #{@path} #{group_watcher.number_lines_read}" if group_watcher_limit

if group_watcher_limit || limit_bytes_per_second_reached? || should_shutdown_now?
# Just get out from tailing loop.
read_more = false
break
end

if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
Expand Down
204 changes: 204 additions & 0 deletions lib/fluent/plugin/in_tail/group_watch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/input'

module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
module GroupWatchParams
include Fluent::Configurable

DEFAULT_KEY = /.*/
DEFAULT_LIMIT = -1
REGEXP_JOIN = "_"

config_section :group, param_name: :group, required: false, multi: false do
desc 'Regex for extracting group\'s metadata'
config_param :pattern,
:regexp,
default: /^\/var\/log\/containers\/(?<podname>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container>.+)-(?<docker_id>[a-z0-9]{64})\.log$/

desc 'Period of time in which the group_line_limit is applied'
config_param :rate_period, :time, default: 5

config_section :rule, param_name: :rule, required: true, multi: true do
desc 'Key-value pairs for grouping'
config_param :match, :hash, value_type: :regexp, default: { namespace: [DEFAULT_KEY], podname: [DEFAULT_KEY] }
desc 'Maximum number of log lines allowed per group over a period of rate_period'
config_param :limit, :integer, default: DEFAULT_LIMIT
end
end
end

module GroupWatch
def self.included(mod)
mod.include GroupWatchParams
end

attr_reader :group_watchers, :default_group_key

def initialize
super
@group_watchers = {}
@group_keys = nil
@default_group_key = nil
end

def configure(conf)
super

unless @group.nil?
## Ensuring correct time period syntax
@group.rule.each { |rule|
raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= GroupWatchParams::DEFAULT_LIMIT
}

@group_keys = Regexp.compile(@group.pattern).named_captures.keys
@default_group_key = ([GroupWatchParams::DEFAULT_KEY] * @group_keys.length).join(GroupWatchParams::REGEXP_JOIN)

## Ensures that "specific" rules (with larger number of `rule.match` keys)
## have a higher priority against "generic" rules (with less number of `rule.match` keys).
## This will be helpful when a file satisfies more than one rule.
@group.rule.sort_by! { |rule| -rule.match.length() }
construct_groupwatchers
@group_watchers[@default_group_key] ||= GroupWatcher.new(@group.rate_period, GroupWatchParams::DEFAULT_LIMIT)
end
end

def add_path_to_group_watcher(path)
return nil if @group.nil?
group_watcher = find_group_from_metadata(path)
group_watcher.add(path) unless group_watcher.include?(path)
group_watcher
end

def remove_path_from_group_watcher(path)
return if @group.nil?
group_watcher = find_group_from_metadata(path)
group_watcher.delete(path)
end

def construct_group_key(named_captures)
match_rule = []
@group_keys.each { |key|
match_rule.append(named_captures.fetch(key, GroupWatchParams::DEFAULT_KEY))
}
match_rule = match_rule.join(GroupWatchParams::REGEXP_JOIN)

match_rule
end

def construct_groupwatchers
@group.rule.each { |rule|
match_rule = construct_group_key(rule.match)
@group_watchers[match_rule] ||= GroupWatcher.new(@group.rate_period, rule.limit)
}
end

def find_group(metadata)
metadata_key = construct_group_key(metadata)
gw_key = @group_watchers.keys.find { |regexp| metadata_key.match?(regexp) && regexp != @default_group_key }
gw_key ||= @default_group_key

@group_watchers[gw_key]
end

def find_group_from_metadata(path)
begin
metadata = @group.pattern.match(path).named_captures
group_watcher = find_group(metadata)
rescue
log.warn "Cannot find group from metadata, Adding file in the default group"
group_watcher = @group_watchers[@default_group_key]
end

group_watcher
end
end

class GroupWatcher
attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period

FileCounter = Struct.new(
:number_lines_read,
:start_reading_time,
)

def initialize(rate_period = 60, limit = -1)
@current_paths = {}
@rate_period = rate_period
@limit = limit
end

def add(path)
@current_paths[path] = FileCounter.new(0, nil)
end

def include?(path)
@current_paths.key?(path)
end

def size
@current_paths.size
end

def delete(path)
@current_paths.delete(path)
end

def update_reading_time(path)
@current_paths[path].start_reading_time ||= Fluent::Clock.now
end

def update_lines_read(path, value)
@current_paths[path].number_lines_read += value
end

def reset_counter(path)
@current_paths[path].start_reading_time = nil
@current_paths[path].number_lines_read = 0
end

def time_spent_reading(path)
Fluent::Clock.now - @current_paths[path].start_reading_time
end

def limit_time_period_reached?(path)
time_spent_reading(path) < @rate_period
end

def limit_lines_reached?(path)
return true unless include?(path)
return true if @limit == 0

return false if @limit < 0
return false if @current_paths[path].number_lines_read < @limit / size

# update_reading_time(path)
if limit_time_period_reached?(path) # Exceeds limit
true
else # Does not exceed limit
reset_counter(path)
false
end
end

def to_s
super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}"
end
end
end
end
16 changes: 12 additions & 4 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ class IntailIOHandlerTest < Test::Unit::TestCase
@file.unlink rescue nil
end

def create_target_info
Fluent::Plugin::TailInput::TargetInfo.new(@file.path, Fluent::FileWrapper.stat(@file.path).ino)
end

def create_watcher
Fluent::Plugin::TailInput::TailWatcher.new(create_target_info, nil, nil, nil, nil, nil, nil, nil, nil)
end

test '#on_notify load file content and passed it to receive_lines method' do
text = "this line is test\ntest line is test\n"
@file.write(text)
@file.close

watcher = 'watcher'
watcher = create_watcher

update_pos = 0

Expand Down Expand Up @@ -61,7 +69,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase

update_pos = 0

watcher = 'watcher'
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
Expand Down Expand Up @@ -92,7 +100,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase

update_pos = 0

watcher = 'watcher'
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
Expand All @@ -119,7 +127,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase

update_pos = 0

watcher = 'watcher'
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
Expand Down
Loading

0 comments on commit e55d409

Please sign in to comment.