Skip to content

Commit

Permalink
Merge pull request #2667 from ganmacs/make-engine-lighter
Browse files Browse the repository at this point in the history
Make engine lighter
  • Loading branch information
ganmacs authored Oct 29, 2019
2 parents a9ec7fa + 65b7429 commit a738583
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 88 deletions.
98 changes: 12 additions & 86 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
require 'fluent/time'
require 'fluent/system_config'
require 'fluent/plugin'
require 'fluent/fluent_log_event_router'

module Fluent
class EngineClass
Expand All @@ -30,33 +31,22 @@ class EngineClass

def initialize
@root_agent = nil
@default_loop = nil
@engine_stopped = false
@_worker_id = nil

@log_event_router = nil
@log_emit_thread = nil
@log_event_loop_stop = false
@log_event_loop_graceful_stop = false
@log_event_queue = []
@log_event_verbose = false

@suppress_config_dump = false
@without_source = false

@fluent_log_event_router = nil
@system_config = SystemConfig.new

@dry_run_mode = false
end

MAINLOOP_SLEEP_INTERVAL = 0.3

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

attr_reader :root_agent
attr_reader :matches, :sources
attr_reader :system_config

attr_reader :root_agent, :system_config
attr_accessor :dry_run_mode

def init(system_config)
Expand Down Expand Up @@ -116,43 +106,14 @@ def run_configure(conf)
end

def configure(conf)
# plugins / configuration dumps
Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
$log.info :worker0, "gem '#{spec.name}' version '#{spec.version}'"
end

@root_agent.configure(conf)

begin
log_event_agent = @root_agent.find_label(Fluent::Log::LOG_EVENT_LABEL)
log_event_router = log_event_agent.event_router

# suppress mismatched tags only for <label @FLUENT_LOG> label.
# it's not suppressed in default event router for non-log-event events
log_event_router.suppress_missing_match!
@fleunt_log_event_router = FluentLogEventRouter.build(@root_agent)

@log_event_router = log_event_router

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
log_event_router = @root_agent.event_router

if Fluent::Log.event_tags.any?{|t| log_event_router.match?(t) }
@log_event_router = log_event_router

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
if @fleunt_log_event_router.emittable?
$log.enable_event(true)
end

$log.enable_event(true) if @log_event_router

unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
Expand Down Expand Up @@ -183,39 +144,12 @@ def now
Fluent::EventTime.now
end

def log_event_loop
$log.disable_events(Thread.current)

while sleep(LOG_EMIT_INTERVAL)
break if @log_event_loop_stop
break if @log_event_loop_graceful_stop && @log_event_queue.empty?
next if @log_event_queue.empty?

# NOTE: thead-safe of slice! depends on GVL
events = @log_event_queue.slice!(0..-1)
next if events.empty?

events.each {|tag,time,record|
begin
@log_event_router.emit(tag, time, record)
rescue => e
# This $log.error doesn't emit log events, because of `$log.disable_events(Thread.current)` above
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
end
}
end
end

def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start

if @log_event_router
$log.enable_event(true)
@log_emit_thread = Thread.new(&method(:log_event_loop))
@log_emit_thread.abort_on_exception = true
end
@fleunt_log_event_router.start

$log.info "fluentd worker is now running", worker: worker_id
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
Expand All @@ -229,19 +163,12 @@ def run

unless @log_event_verbose
$log.enable_event(false)
if @log_emit_thread
# to make sure to emit all log events into router, before shutting down
@log_event_loop_graceful_stop = true
@log_emit_thread.join
@log_emit_thread = nil
end
@fleunt_log_event_router.graceful_stop
end
$log.info "shutting down fluentd worker", worker: worker_id
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end

@fleunt_log_event_router.stop
end

def stop
Expand All @@ -250,8 +177,7 @@ def stop
end

def push_log_event(tag, time, record)
return if @log_emit_thread.nil?
@log_event_queue.push([tag, time, record])
@fleunt_log_event_router.emit_event([tag, time, record])
end

def worker_id
Expand Down
137 changes: 137 additions & 0 deletions lib/fluent/fluent_log_event_router.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/log'

module Fluent
# DO NOT write any logic here
class NullFluentLogEventRouter
def start; end

def stop; end

def graceful_stop; end

def emit_event(_event); end

def emittable?
self.class != NullFluentLogEventRouter
end
end

# This class is for handling fluentd's inner log
# e.g. <label @FLUNT_LOG> section and <match fluent.**> section
class FluentLogEventRouter < NullFluentLogEventRouter
# @param root_agent [Fluent::RootAgent]
def self.build(root_agent)
log_event_router = nil

begin
log_event_agent = root_agent.find_label(Fluent::Log::LOG_EVENT_LABEL)
log_event_router = log_event_agent.event_router

# suppress mismatched tags only for <label @FLUENT_LOG> label.
# it's not suppressed in default event router for non-log-event events
log_event_router.suppress_missing_match!

log_event_router = log_event_router

unmatched_tags = Fluent::Log.event_tags.select { |t| !log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end

rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
root_log_event_router = root_agent.event_router

if Fluent::Log.event_tags.any? { |t| root_log_event_router.match?(t) }
log_event_router = root_log_event_router

unmatched_tags = Fluent::Log.event_tags.select { |t| !log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
end

if log_event_router
FluentLogEventRouter.new(log_event_router)
else
$log.debug('No fluent logger for internal event')
NullFluentLogEventRouter.new
end
end

STOP = :stop
GRACEFUL_STOP = :graceful_stop

# @param event_router [Fluent::EventRouter]
def initialize(event_router)
@event_router = event_router
@thread = nil
@graceful_stop = false
@event_queue = Queue.new
end

def start
@thread = Thread.new do
$log.disable_events(Thread.current)

loop do
event = @event_queue.pop

case event
when GRACEFUL_STOP
@graceful_stop = true
when STOP
break
else
begin
tag, time, record = event
@event_router.emit(tag, time, record)
rescue => e
# This $log.error doesn't emit log events, because of `$log.disable_events(Thread.current)` above
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
end
end

if @graceful_stop && @event_queue.empty?
break
end
end
end

@thread.abort_on_exception = true
end

def stop
@event_queue.push(STOP)
# there is no problem calling Thread#join multiple times.
@thread && @thread.join
end

def graceful_stop
# to make sure to emit all log events into router, before shutting down
@event_queue.push(GRACEFUL_STOP)
@thread && @thread.join
end

def emit_event(event)
@event_queue.push(event)
end
end
end
11 changes: 9 additions & 2 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def dry_run
Fluent::Engine.dry_run_mode = true
change_privilege
MessagePackFactory.init
init_engine
init_engine(supervisor: true)
run_configure
rescue Fluent::ConfigError => e
$log.error "config error", file: @config_path, error: e
Expand Down Expand Up @@ -794,7 +794,7 @@ def change_privilege
ServerEngine::Privilege.change(@chuser, @chgroup)
end

def init_engine
def init_engine(supervisor: false)
Fluent::Engine.init(@system_config)

@libs.each {|lib|
Expand All @@ -807,6 +807,13 @@ def init_engine
Fluent::Engine.add_plugin_dir(dir)
end
}

if supervisor
# plugins / configuration dumps
Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
$log.info("gem '#{spec.name}' version '#{spec.version}'")
end
end
end

def run_configure
Expand Down
Loading

0 comments on commit a738583

Please sign in to comment.