Skip to content

Commit

Permalink
Merge pull request fluent#2674 from ganmacs/set-system-config-variabl…
Browse files Browse the repository at this point in the history
…es-explicitly

Set system config variables explicitly
  • Loading branch information
ganmacs authored Oct 30, 2019
2 parents cabffa9 + 29fd4d7 commit 0ac4f42
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 260 deletions.
10 changes: 7 additions & 3 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@

exit 0 if early_exit

require 'fluent/supervisor'
if opts[:supervise]
if Fluent.windows?
if opts[:log_path] && opts[:log_path] != "-"
Expand All @@ -321,11 +320,16 @@
end
end
end
Fluent::Supervisor.new(opts).run_supervisor

supervisor = Fluent::Supervisor.new(opts)
supervisor.configure(supervisor: true)
supervisor.run_supervisor
else
if opts[:standalone_worker] && opts[:workers] && opts[:workers] > 1
puts "Error: multi workers is not supported with --no-supervisor"
exit 2
end
Fluent::Supervisor.new(opts).run_worker
worker = Fluent::Supervisor.new(opts)
worker.configure
worker.run_worker
end
1 change: 1 addition & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def configure(conf)
end

def add_plugin_dir(dir)
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::Plugin.add_plugin_dir')
Plugin.add_plugin_dir(dir)
end

Expand Down
204 changes: 108 additions & 96 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ def reopen!
self
end

def apply_options(opts)
$log.format = opts[:format] if opts[:format]
$log.time_format = opts[:time_format] if opts[:time_format]
def apply_options(format: nil, time_format: nil)
$log.format = format if format
$log.time_format = time_format if time_format
end

def level=(level)
Expand Down Expand Up @@ -444,7 +444,6 @@ def self.cleanup_resources

def initialize(opt)
@daemonize = opt[:daemonize]
@supervise = opt[:supervise]
@standalone_worker= opt[:standalone_worker]
@config_path = opt[:config_path]
@inline_config = opt[:inline_config]
Expand All @@ -457,56 +456,42 @@ def initialize(opt)
@plugin_dirs = opt[:plugin_dirs]
@chgroup = opt[:chgroup]
@chuser = opt[:chuser]
@process_name = nil

@workers = opt[:workers]
@root_dir = opt[:root_dir]
@log_level = opt[:log_level]
@log_rotate_age = opt[:log_rotate_age]
@log_rotate_size = opt[:log_rotate_size]
@suppress_interval = opt[:suppress_interval]
@suppress_config_dump = opt[:suppress_config_dump]
@log_event_verbose = opt[:log_event_verbose]
@without_source = opt[:without_source]
@signame = opt[:signame]

@suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace]
log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace}
@cl_opt = opt
@conf = nil

log_opts = { suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace] }
@log = LoggerInitializer.new(
@log_path, @log_level, @chuser, @chgroup, log_opts,
@log_path, opt[:log_level], @chuser, @chgroup, log_opts,
log_rotate_age: @log_rotate_age,
log_rotate_size: @log_rotate_size
)
@finished = false
end

def run_supervisor
@log.init(:supervisor, 0)
show_plugin_config if @show_plugin_config
read_config
set_system_config
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

$log.info :supervisor, "parsing config file is succeeded", path: @config_path

if @workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@workers}"
if @system_config.workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
end

if @root_dir
if File.exist?(@root_dir)
unless Dir.exist?(@root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}"
root_dir = @system_config.root_dir
if root_dir
if File.exist?(root_dir)
unless Dir.exist?(root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}"
end
else
begin
FileUtils.mkdir_p(@root_dir)
FileUtils.mkdir_p(root_dir)
rescue => e
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}"
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}"
end
end
end

dry_run_cmd if @dry_run
supervise
end
Expand All @@ -517,7 +502,7 @@ def options
'pid_file' => @daemonize,
'plugin_dirs' => @plugin_dirs,
'log_path' => @log_path,
'root_dir' => @root_dir,
'root_dir' => @system_config.root_dir,
}
end

Expand All @@ -527,22 +512,11 @@ def run_worker
rescue Exception
# ignore LoadError and others (related with signals): it may raise these errors in Windows
end
worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
process_type = case
when @standalone_worker then :standalone
when worker_id == 0 then :worker0
else :workers
end
@log.init(process_type, worker_id)
show_plugin_config if @show_plugin_config
read_config
set_system_config
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

Process.setproctitle("worker:#{@process_name}") if @process_name
Process.setproctitle("worker:#{@system_config.process_name}") if @process_name

if @standalone_worker && @workers != 1
raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@workers}"
if @standalone_worker && @system_config.workers != 1
raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}"
end

install_main_process_signal_handlers
Expand All @@ -562,6 +536,50 @@ def run_worker
end
end

def configure(supervisor: false)
if supervisor
@log.init(:supervisor, 0)
else
worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
process_type = case
when @standalone_worker then :standalone
when worker_id == 0 then :worker0
else :workers
end
@log.init(process_type, worker_id)
end

if @show_plugin_config
show_plugin_config
end

@conf = read_config
@system_config = build_system_config(@conf)

@log.level = @system_config.log_level
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path

@libs.each do |lib|
require lib
end

@plugin_dirs.each do |dir|
if Dir.exist?(dir)
dir = File.expand_path(dir)
Fluent::Plugin.add_plugin_dir(dir)
end
end

if supervisor
# plugins / configuration dumps
Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
$log.info("gem '#{spec.name}' version '#{spec.version}'")
end
end
end

private

def create_socket_manager
Expand All @@ -586,7 +604,7 @@ def dry_run
Fluent::Engine.dry_run_mode = true
change_privilege
MessagePackFactory.init
init_engine(supervisor: true)
init_engine
run_configure
rescue Fluent::ConfigError => e
$log.error "config error", file: @config_path, error: e
Expand All @@ -607,7 +625,7 @@ def supervise
# Make dumpable conf, which is set corresponding_proxies for all elements in all worker sections
dry_run

Process.setproctitle("supervisor:#{@process_name}") if @process_name
Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
$log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

rubyopt = ENV["RUBYOPT"]
Expand All @@ -619,24 +637,24 @@ def supervise

$log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd

params = {}
params['main_cmd'] = fluentd_spawn_cmd
params['daemonize'] = @daemonize
params['inline_config'] = @inline_config
params['log_path'] = @log_path
params['log_rotate_age'] = @log_rotate_age
params['log_rotate_size'] = @log_rotate_size
params['chuser'] = @chuser
params['chgroup'] = @chgroup
params['use_v1_config'] = @use_v1_config
params['conf_encoding'] = @conf_encoding

# system config parameters
params['workers'] = @workers
params['root_dir'] = @root_dir
params['log_level'] = @log_level
params['suppress_repeated_stacktrace'] = @suppress_repeated_stacktrace
params['signame'] = @signame
params = {
'main_cmd' => fluentd_spawn_cmd,
'daemonize' => @daemonize,
'inline_config' => @inline_config,
'log_path' => @log_path,
'log_rotate_age' => @log_rotate_age,
'log_rotate_size' => @log_rotate_size,
'chuser' => @chuser,
'chgroup' => @chgroup,
'use_v1_config' => @use_v1_config,
'conf_encoding' => @conf_encoding,
'signame' => @signame,

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
}

se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
Expand Down Expand Up @@ -721,18 +739,18 @@ def logging_with_console_output
unless @log.stdout?
logger = ServerEngine::DaemonLogger.new(STDOUT)
log = Fluent::Log.new(logger)
log.level = @log_level
log.level = @system_config.log_level
console = log.enable_debug
yield console
end
end

def main_process(&block)
if @process_name
if @workers > 1
Process.setproctitle("worker:#{@process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
if @system_config.process_name
if @system_config.workers > 1
Process.setproctitle("worker:#{@system_config.process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
else
Process.setproctitle("worker:#{@process_name}")
Process.setproctitle("worker:#{@system_config.process_name}")
end
end

Expand Down Expand Up @@ -781,39 +799,33 @@ def read_config
elsif @inline_config
config_data << "\n" << @inline_config.gsub("\\n","\n")
end
@conf = Fluent::Config.parse(config_data, config_fname, config_basedir, @use_v1_config)
Fluent::Config.parse(config_data, config_fname, config_basedir, @use_v1_config)
end

def set_system_config
@system_config = SystemConfig.create(@conf) # @conf is set in read_config
@system_config.attach(self)
@system_config.apply(self)
def build_system_config(conf)
system_config = SystemConfig.create(conf)
opt = {}
Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param|
if @cl_opt.key?(param) && !@cl_opt[param].nil?
if param == :log_level && opt[:log_level] == Fluent::Log::LEVEL_INFO
# info level can't be specified via command line option.
# log_level is info here, it is default value and <system>'s log_level should be applied if exists.
next
end

opt[param] = @cl_opt[param]
end
end
system_config.overwrite_variables(opt)
system_config
end

def change_privilege
ServerEngine::Privilege.change(@chuser, @chgroup)
end

def init_engine(supervisor: false)
def init_engine
Fluent::Engine.init(@system_config)

@libs.each {|lib|
require lib
}

@plugin_dirs.each {|dir|
if Dir.exist?(dir)
dir = File.expand_path(dir)
Fluent::Engine.add_plugin_dir(dir)
end
}

if supervisor
# plugins / configuration dumps
Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
$log.info("gem '#{spec.name}' version '#{spec.version}'")
end
end
end

def run_configure
Expand Down
Loading

0 comments on commit 0ac4f42

Please sign in to comment.