Skip to content

Commit

Permalink
Merge pull request #2716 from ganmacs/light-reload
Browse files Browse the repository at this point in the history
Reload new config without restarting process
  • Loading branch information
repeatedly authored Jan 7, 2020
2 parents 25de349 + d8b8bc6 commit 2800465
Show file tree
Hide file tree
Showing 15 changed files with 929 additions and 138 deletions.
19 changes: 19 additions & 0 deletions lib/fluent/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@

module Fluent
module Config
# @param config_path [String] config file path
# @param encoding [String] encoding of config file
# @param additional_config [String] config which is added to last of config body
# @param use_v1_config [Bool] config is formatted with v1 or not
# @return [Fluent::Config]
def self.build(config_path:, encoding: 'utf-8', additional_config: nil, use_v1_config: true)
config_fname = File.basename(config_path)
config_basedir = File.dirname(config_path)
config_data = File.open(config_path, "r:#{encoding}:utf-8") do |f|
s = f.read
if additional_config
c = additional_config.gsub("\\n", "\n")
s += "\n#{c}"
end
s
end
Fluent::Config.parse(config_data, config_fname, config_basedir, use_v1_config)
end

def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1)
parser = if fname =~ /\.rb$/ || syntax == :ruby
:ruby
Expand Down
69 changes: 60 additions & 9 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require 'fluent/system_config'
require 'fluent/plugin'
require 'fluent/fluent_log_event_router'
require 'fluent/static_config_analysis'

module Fluent
class EngineClass
Expand Down Expand Up @@ -157,14 +158,47 @@ def run
raise
end

unless @log_event_verbose
$log.enable_event(false)
@fluent_log_event_router.graceful_stop
stop_phase(@root_agent)
end

# @param conf [Fluent::Config]
# @param supervisor [Bool]
# @reutrn nil
def reload_config(conf, supervisor: false)
# configure first to reduce down time while restarting
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
end
end
$log.info "shutting down fluentd worker", worker: worker_id
shutdown

@fluent_log_event_router.stop
# Assign @root_agent to new root_agent
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end

unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end

# supervisor doesn't handle actual data. so the following code is unnecessary.
if supervisor
old_agent.shutdown # to close thread created in #configure
return
end

stop_phase(old_agent)

$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
end

def stop
Expand All @@ -189,12 +223,29 @@ def worker_id
end

private
def start

def stop_phase(root_agent)
unless @log_event_verbose
$log.enable_event(false)
@fluent_log_event_router.graceful_stop
end
$log.info 'shutting down fluentd worker', worker: worker_id
root_agent.shutdown

@fluent_log_event_router.stop
end

def start_phase(root_agent)
@fluent_log_event_router = FluentLogEventRouter.build(root_agent)
if @fluent_log_event_router.emittable?
$log.enable_event(true)
end

@root_agent.start
end

def shutdown
@root_agent.shutdown
def start
@root_agent.start
end
end

Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def inspect
# https://github.com/ruby/ruby/blob/trunk/gc.c#L788
"#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)]
end

def reloadable_plugin?
# Engine can't capture all class variables. so it's forbbiden to use class variables in each plugins if enabling reload.
self.class.class_variables.empty?
end
end
end
end
16 changes: 10 additions & 6 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/file_chunk'
require 'fluent/system_config'
require 'fluent/variable_store'

module Fluent
module Plugin
Expand All @@ -43,19 +44,20 @@ class FileBuffer < Fluent::Plugin::Buffer
config_param :file_permission, :string, default: nil # '0644'
config_param :dir_permission, :string, default: nil # '0755'

@@buffer_paths = {}

def initialize
super
@symlink_path = nil
@multi_workers_available = false
@additional_resume_path = nil
@buffer_path = nil
@variable_store = nil
end

def configure(conf)
super

@variable_store = Fluent::VariableStore.fetch_or_build(:buf_file)

multi_workers_configured = owner.system_config.workers > 1 ? true : false

using_plugin_root_dir = false
Expand All @@ -69,13 +71,13 @@ def configure(conf)
end

type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
if @@buffer_paths.has_key?(@path) && !called_in_test?
type_using_this_path = @@buffer_paths[@path]
if @variable_store.has_key?(@path) && !called_in_test?
type_using_this_path = @variable_store[@path]
raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
end

@buffer_path = @path
@@buffer_paths[@buffer_path] = type_of_owner
@variable_store[@buffer_path] = type_of_owner

specified_directory_exists = File.exist?(@path) && File.directory?(@path)
unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*')
Expand Down Expand Up @@ -125,7 +127,9 @@ def start
end

def stop
@@buffer_paths.delete(@buffer_path)
if @variable_store
@variable_store.delete(@buffer_path)
end

super
end
Expand Down
16 changes: 10 additions & 6 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/file_single_chunk'
require 'fluent/system_config'
require 'fluent/variable_store'

module Fluent
module Plugin
Expand Down Expand Up @@ -48,18 +49,19 @@ class FileSingleBuffer < Fluent::Plugin::Buffer
desc 'The permission of chunk directory. If no specified, <system> setting or 0755 is used'
config_param :dir_permission, :string, default: nil

@@buffer_paths = {}

def initialize
super

@multi_workers_available = false
@additional_resume_path = nil
@variable_store = nil
end

def configure(conf)
super

@variable_store = Fluent::VariableStore.fetch_or_build(:buf_file_single)

if @chunk_format == :auto
@chunk_format = owner.formatted_to_msgpack_binary? ? :msgpack : :text
end
Expand Down Expand Up @@ -117,12 +119,12 @@ def configure(conf)
end

type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
if @@buffer_paths.has_key?(@path) && !called_in_test?
type_using_this_path = @@buffer_paths[@path]
if @variable_store.has_key?(@path) && !called_in_test?
type_using_this_path = @variable_store[@path]
raise Fluent::ConfigError, "Other '#{type_using_this_path}' plugin already uses same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
end

@@buffer_paths[@path] = type_of_owner
@variable_store[@path] = type_of_owner
@dir_permission = if @dir_permission
@dir_permission.to_i(8)
else
Expand All @@ -145,7 +147,9 @@ def start
end

def stop
@@buffer_paths.delete(@path)
if @variable_store
@variable_store.delete(@path)
end

super
end
Expand Down
14 changes: 8 additions & 6 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require 'fluent/event'
require 'fluent/plugin/buffer'
require 'fluent/plugin/parser_multiline'
require 'fluent/variable_store'

if Fluent.windows?
require_relative 'file_wrapper'
Expand Down Expand Up @@ -105,9 +106,8 @@ def initialize

attr_reader :paths

@@pos_file_paths = {}

def configure(conf)
@variable_store = Fluent::VariableStore.fetch_or_build(:in_tail)
compat_parameters_convert(conf, :parser)
parser_config = conf.elements('parse').first
unless parser_config
Expand Down Expand Up @@ -139,11 +139,11 @@ def configure(conf)

# TODO: Use plugin_root_dir and storage plugin to store positions if available
if @pos_file
if @@pos_file_paths.has_key?(@pos_file) && !called_in_test?
plugin_id_using_this_path = @@pos_file_paths[@pos_file]
if @variable_store.key?(@pos_file) && !called_in_test?
plugin_id_using_this_path = @variable_store[@pos_file]
raise Fluent::ConfigError, "Other 'in_tail' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}"
end
@@pos_file_paths[@pos_file] = self.plugin_id
@variable_store[@pos_file] = self.plugin_id
else
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
Expand Down Expand Up @@ -212,7 +212,9 @@ def start
end

def stop
@@pos_file_paths.delete(@pos_file)
if @variable_store
@variable_store.delete(@pos_file)
end

super
end
Expand Down
13 changes: 9 additions & 4 deletions lib/fluent/plugin_id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@
#

require 'set'
require 'fluent/variable_store'

module Fluent
module PluginId
@@configured_ids = Set.new

def initialize
super

@_plugin_id_variable_store = nil
@_plugin_root_dir = nil
@id = nil
end

def configure(conf)
@_plugin_id_variable_store = Fluent::VariableStore.fetch_or_build(:pluing_id, default_value: Set.new)
@id = conf['@id']
@_id_configured = !!@id # plugin id is explicitly configured by users (or not)
if @id
@id = @id.to_s
if @@configured_ids.include?(@id) && !plugin_id_for_test?
if @_plugin_id_variable_store.include?(@id) && !plugin_id_for_test?
raise Fluent::ConfigError, "Duplicated plugin id `#{@id}`. Check whole configuration and fix it."
end
@@configured_ids.add(@id)
@_plugin_id_variable_store.add(@id)
end

super
Expand Down Expand Up @@ -79,7 +82,9 @@ def plugin_root_dir
end

def stop
@@configured_ids.delete(@id)
if @_plugin_id_variable_store
@_plugin_id_variable_store.delete(@id)
end

super
end
Expand Down
Loading

0 comments on commit 2800465

Please sign in to comment.