Skip to content

Commit

Permalink
in_tail: Add group based log collection from in_tail_with_throttle
Browse files Browse the repository at this point in the history
Signed-off-by: Pranjal Gupta <[email protected]>
  • Loading branch information
Pranjal-Gupta2 committed Jan 7, 2022
1 parent 178b077 commit 514126c
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 608 deletions.
182 changes: 180 additions & 2 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class TailInput < Fluent::Plugin::Input
RESERVED_CHARS = ['/', '*', '%'].freeze
MetricsInfo = Struct.new(:opened, :closed, :rotated)

DEFAULT_NAMESPACE = DEFAULT_APPNAME = /./
DEFAULT_LIMIT = -1

class WatcherSetupError < StandardError
def initialize(msg)
@message = msg
Expand All @@ -60,6 +63,9 @@ def initialize
@shutdown_start_time = nil
@metrics = nil
@startup = true
# Map rules with GroupWatcher objects
@group_watchers = {}
@sorted_path = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -120,7 +126,25 @@ def initialize
config_argument :usage, :string, default: 'in_tail_parser'
end

attr_reader :paths
config_section :group, param_name: :group, required: false, multi: false do
desc 'Regex for extracting group\'s metadata'
config_param :pattern,
:regexp,
default: /var\/log\/containers\/(?<appname>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container>.+)-(?<docker_id>[a-z0-9]{64})\.log$/
desc 'Period of time in which the group_line_limit is applied'
config_param :rate_period, :time, default: 5

config_section :rule, multi: true, required: true do
desc 'Namespace key'
config_param :namespace, :array, value_type: :string, default: [DEFAULT_NAMESPACE]
desc 'App name key'
config_param :appname, :array, value_type: :string, default: [DEFAULT_APPNAME]
desc 'Maximum number of log lines allowed per group over a period of rate_period'
config_param :limit, :integer, default: DEFAULT_LIMIT
end
end

attr_reader :paths, :group_watchers

def configure(conf)
@variable_store = Fluent::VariableStore.fetch_or_build(:in_tail)
Expand Down Expand Up @@ -196,6 +220,14 @@ def configure(conf)
@read_bytes_limit_per_second = min_bytes
end
end

## Ensuring correct time period syntax
@group.rule.each { |rule|
raise "Metadata Group Limit >= DEFAULT_LIMIT" unless rule.limit >= DEFAULT_LIMIT
} unless @group.nil?

construct_groupwatchers unless @group.nil?

opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files")
Expand Down Expand Up @@ -235,6 +267,51 @@ def parse_encoding_param(encoding_name)
end
end

def construct_groupwatchers
@group.rule.each { |rule|
num_groups = rule.namespace.size * rule.appname.size

rule.namespace.each { |namespace|
namespace = /#{Regexp.quote(namespace)}/ unless namespace.eql?(DEFAULT_NAMESPACE)
@group_watchers[namespace] ||= {}

rule.appname.each { |appname|
appname = /#{Regexp.quote(appname)}/ unless appname.eql?(DEFAULT_APPNAME)
@group_watchers[namespace][appname] = GroupWatcher.new(@group.rate_period, rule.limit/num_groups)
}

@group_watchers[namespace][DEFAULT_APPNAME] ||= GroupWatcher.new(@group.rate_period)
}
}

if @group_watchers.dig(DEFAULT_NAMESPACE, DEFAULT_APPNAME).nil?
@group_watchers[DEFAULT_NAMESPACE] ||= {}
@group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME] = GroupWatcher.new(@group.rate_period)
end
end

def find_group(namespace, appname)
namespace_key = @group_watchers.keys.find { |regexp| namespace.match?(regexp) && regexp != DEFAULT_NAMESPACE }
namespace_key ||= DEFAULT_NAMESPACE

appname_key = @group_watchers[namespace_key].keys.find { |regexp| appname.match?(regexp) && regexp != DEFAULT_APPNAME }
appname_key ||= DEFAULT_APPNAME

@group_watchers[namespace_key][appname_key]
end

def find_group_from_metadata(path)
begin
metadata = @group.pattern.match(path)
group_watcher = find_group(metadata['namespace'], metadata['appname'])
rescue => e
$log.warn "Cannot find group from metadata, Adding file in the default group"
group_watcher = @group_watchers[DEFAULT_NAMESPACE][DEFAULT_APPNAME]
end

group_watcher
end

def start
super

Expand Down Expand Up @@ -406,6 +483,12 @@ def setup_watcher(target_info, pe)
event_loop_attach(watcher)
end

unless @group.nil?
group_watcher = find_group_from_metadata(target_info.path)
group_watcher.add(tw.path) unless group_watcher.include?(tw.path)
tw.group_watcher = group_watcher
end

tw
rescue => e
if tw
Expand Down Expand Up @@ -461,6 +544,11 @@ def start_watchers(targets_info)

def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
targets_info.each_value { |target_info|
unless @group.nil?
group_watcher = find_group_from_metadata(target_info.path)
group_watcher.delete(target_info.path)
end

if remove_watcher
tw = @tails.delete(target_info)
else
Expand Down Expand Up @@ -548,7 +636,7 @@ def detach_watcher_after_rotate_wait(tw, ino)
if @open_on_every_update
# Detach now because it's already closed, waiting it doesn't make sense.
detach_watcher(tw, ino)
elsif @read_bytes_limit_per_second < 0
elsif @read_bytes_limit_per_second < 0 || (!tw.group_watcher.nil? && tw.group_watcher.limit <= 0)
# throttling isn't enabled, just wait @rotate_wait
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw, ino)
Expand Down Expand Up @@ -753,6 +841,79 @@ def on_timer
end
end

class GroupWatcher
attr_accessor :current_paths, :limit, :number_lines_read, :start_reading_time, :rate_period

FileCounter = Struct.new(
:number_lines_read,
:start_reading_time,
)

def initialize(rate_period = 60, limit = -1)
@current_paths = {}
@rate_period = rate_period
@limit = limit
end

def add(path)
@current_paths[path] = FileCounter.new(0, nil)
end

def include?(path)
@current_paths.key?(path)
end

def size
@current_paths.size
end

def delete(path)
@current_paths.delete(path)
end

def update_reading_time(path)
@current_paths[path].start_reading_time ||= Fluent::Clock.now
end

def update_lines_read(path, value)
@current_paths[path].number_lines_read += value
end

def reset_counter(path)
@current_paths[path].start_reading_time = nil
@current_paths[path].number_lines_read = 0
end

def time_spent_reading(path)
Fluent::Clock.now - @current_paths[path].start_reading_time
end

def limit_time_period_reached?(path)
time_spent_reading(path) < @rate_period
end

def limit_lines_reached?(path)
return true unless include?(path)
return true if @limit == 0

return false if @limit < 0
return false if @current_paths[path].number_lines_read < @limit / size

# update_reading_time(path)
if limit_time_period_reached?(path) # Exceeds limit
true
else # Does not exceed limit
reset_counter(path)
false
end
end

def to_s
super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}"
end
end


class TailWatcher
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
@path = target_info.path
Expand All @@ -775,6 +936,11 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
attr_reader :watchers
attr_accessor :group_watcher

def group_watcher=(group_watcher)
@group_watcher = group_watcher
end

def tag
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
Expand Down Expand Up @@ -997,6 +1163,10 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@log.info "following tail of #{@path}"
end

def group_watcher
@watcher.group_watcher
end

def on_notify
@notify_mutex.synchronize { handle_notify }
end
Expand Down Expand Up @@ -1054,6 +1224,7 @@ def should_shutdown_now?

def handle_notify
return if limit_bytes_per_second_reached?
return if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path)

with_io do |io|
begin
Expand All @@ -1063,12 +1234,19 @@ def handle_notify
begin
while true
@start_reading_time ||= Fluent::Clock.now
group_watcher.update_reading_time(@path) unless group_watcher.nil?
data = io.readpartial(BYTES_TO_READ, @iobuf)
@eof = false
@number_bytes_read += data.bytesize
@fifo << data
group_watcher.update_lines_read(@path, -@lines.size) unless group_watcher.nil?
@fifo.read_lines(@lines)
group_watcher.update_lines_read(@path, @lines.size) unless group_watcher.nil?

if !group_watcher.nil? && group_watcher.limit_lines_reached?(@path) || should_shutdown_now?
read_more = false
break
end
if limit_bytes_per_second_reached? || should_shutdown_now?
# Just get out from tailing loop.
read_more = false
Expand Down
Loading

0 comments on commit 514126c

Please sign in to comment.