Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set system config variables explicitly #2674

Merged
merged 15 commits into from
Oct 30, 2019
10 changes: 7 additions & 3 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@

exit 0 if early_exit

require 'fluent/supervisor'
if opts[:supervise]
if Fluent.windows?
if opts[:log_path] && opts[:log_path] != "-"
Expand All @@ -321,11 +320,16 @@
end
end
end
Fluent::Supervisor.new(opts).run_supervisor

supervisor = Fluent::Supervisor.new(opts)
supervisor.configure(supervisor: true)
supervisor.run_supervisor
else
if opts[:standalone_worker] && opts[:workers] && opts[:workers] > 1
puts "Error: multi workers is not supported with --no-supervisor"
exit 2
end
Fluent::Supervisor.new(opts).run_worker
worker = Fluent::Supervisor.new(opts)
worker.configure
worker.run_worker
end
1 change: 1 addition & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def configure(conf)
end

def add_plugin_dir(dir)
$log.warn('Deprecated method: this method is going to be deleted. Use Fluent::Plugin.add_plugin_dir')
Plugin.add_plugin_dir(dir)
end

Expand Down
204 changes: 108 additions & 96 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ def reopen!
self
end

def apply_options(opts)
$log.format = opts[:format] if opts[:format]
$log.time_format = opts[:time_format] if opts[:time_format]
def apply_options(format: nil, time_format: nil)
$log.format = format if format
$log.time_format = time_format if time_format
end

def level=(level)
Expand Down Expand Up @@ -444,7 +444,6 @@ def self.cleanup_resources

def initialize(opt)
@daemonize = opt[:daemonize]
@supervise = opt[:supervise]
@standalone_worker= opt[:standalone_worker]
@config_path = opt[:config_path]
@inline_config = opt[:inline_config]
Expand All @@ -457,56 +456,42 @@ def initialize(opt)
@plugin_dirs = opt[:plugin_dirs]
@chgroup = opt[:chgroup]
@chuser = opt[:chuser]
@process_name = nil

@workers = opt[:workers]
@root_dir = opt[:root_dir]
@log_level = opt[:log_level]
@log_rotate_age = opt[:log_rotate_age]
@log_rotate_size = opt[:log_rotate_size]
@suppress_interval = opt[:suppress_interval]
@suppress_config_dump = opt[:suppress_config_dump]
@log_event_verbose = opt[:log_event_verbose]
@without_source = opt[:without_source]
@signame = opt[:signame]

@suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace]
log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace}
@cl_opt = opt
@conf = nil

log_opts = { suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace] }
@log = LoggerInitializer.new(
@log_path, @log_level, @chuser, @chgroup, log_opts,
@log_path, opt[:log_level], @chuser, @chgroup, log_opts,
log_rotate_age: @log_rotate_age,
log_rotate_size: @log_rotate_size
)
@finished = false
end

def run_supervisor
@log.init(:supervisor, 0)
show_plugin_config if @show_plugin_config
read_config
set_system_config
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

$log.info :supervisor, "parsing config file is succeeded", path: @config_path

if @workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@workers}"
if @system_config.workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
end

if @root_dir
if File.exist?(@root_dir)
unless Dir.exist?(@root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}"
root_dir = @system_config.root_dir
if root_dir
if File.exist?(root_dir)
unless Dir.exist?(root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}"
end
else
begin
FileUtils.mkdir_p(@root_dir)
FileUtils.mkdir_p(root_dir)
rescue => e
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}"
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}"
end
end
end

dry_run_cmd if @dry_run
supervise
end
Expand All @@ -517,7 +502,7 @@ def options
'pid_file' => @daemonize,
'plugin_dirs' => @plugin_dirs,
'log_path' => @log_path,
'root_dir' => @root_dir,
'root_dir' => @system_config.root_dir,
}
end

Expand All @@ -527,22 +512,11 @@ def run_worker
rescue Exception
# ignore LoadError and others (related with signals): it may raise these errors in Windows
end
worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
process_type = case
when @standalone_worker then :standalone
when worker_id == 0 then :worker0
else :workers
end
@log.init(process_type, worker_id)
show_plugin_config if @show_plugin_config
read_config
set_system_config
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

Process.setproctitle("worker:#{@process_name}") if @process_name
Process.setproctitle("worker:#{@system_config.process_name}") if @process_name

if @standalone_worker && @workers != 1
raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@workers}"
if @standalone_worker && @system_config.workers != 1
raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}"
end

install_main_process_signal_handlers
Expand All @@ -562,6 +536,50 @@ def run_worker
end
end

def configure(supervisor: false)
if supervisor
@log.init(:supervisor, 0)
else
worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
process_type = case
when @standalone_worker then :standalone
when worker_id == 0 then :worker0
else :workers
end
@log.init(process_type, worker_id)
end

if @show_plugin_config
show_plugin_config
end

@conf = read_config
@system_config = build_system_config(@conf)

@log.level = @system_config.log_level
@log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

$log.info :supervisor, 'parsing config file is succeeded', path: @config_path

@libs.each do |lib|
require lib
end

@plugin_dirs.each do |dir|
if Dir.exist?(dir)
dir = File.expand_path(dir)
Fluent::Plugin.add_plugin_dir(dir)
end
end

if supervisor
# plugins / configuration dumps
Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
$log.info("gem '#{spec.name}' version '#{spec.version}'")
end
end
end

private

def create_socket_manager
Expand All @@ -586,7 +604,7 @@ def dry_run
Fluent::Engine.dry_run_mode = true
change_privilege
MessagePackFactory.init
init_engine(supervisor: true)
init_engine
run_configure
rescue Fluent::ConfigError => e
$log.error "config error", file: @config_path, error: e
Expand All @@ -607,7 +625,7 @@ def supervise
# Make dumpable conf, which is set corresponding_proxies for all elements in all worker sections
dry_run

Process.setproctitle("supervisor:#{@process_name}") if @process_name
Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
$log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

rubyopt = ENV["RUBYOPT"]
Expand All @@ -619,24 +637,24 @@ def supervise

$log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd

params = {}
params['main_cmd'] = fluentd_spawn_cmd
params['daemonize'] = @daemonize
params['inline_config'] = @inline_config
params['log_path'] = @log_path
params['log_rotate_age'] = @log_rotate_age
params['log_rotate_size'] = @log_rotate_size
params['chuser'] = @chuser
params['chgroup'] = @chgroup
params['use_v1_config'] = @use_v1_config
params['conf_encoding'] = @conf_encoding

# system config parameters
params['workers'] = @workers
params['root_dir'] = @root_dir
params['log_level'] = @log_level
params['suppress_repeated_stacktrace'] = @suppress_repeated_stacktrace
params['signame'] = @signame
params = {
'main_cmd' => fluentd_spawn_cmd,
'daemonize' => @daemonize,
'inline_config' => @inline_config,
'log_path' => @log_path,
'log_rotate_age' => @log_rotate_age,
'log_rotate_size' => @log_rotate_size,
'chuser' => @chuser,
'chgroup' => @chgroup,
'use_v1_config' => @use_v1_config,
'conf_encoding' => @conf_encoding,
'signame' => @signame,

'workers' => @system_config.workers,
'root_dir' => @system_config.root_dir,
'log_level' => @system_config.log_level,
'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
}

se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
Expand Down Expand Up @@ -721,18 +739,18 @@ def logging_with_console_output
unless @log.stdout?
logger = ServerEngine::DaemonLogger.new(STDOUT)
log = Fluent::Log.new(logger)
log.level = @log_level
log.level = @system_config.log_level
console = log.enable_debug
yield console
end
end

def main_process(&block)
if @process_name
if @workers > 1
Process.setproctitle("worker:#{@process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
if @system_config.process_name
if @system_config.workers > 1
Process.setproctitle("worker:#{@system_config.process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
else
Process.setproctitle("worker:#{@process_name}")
Process.setproctitle("worker:#{@system_config.process_name}")
end
end

Expand Down Expand Up @@ -781,39 +799,33 @@ def read_config
elsif @inline_config
config_data << "\n" << @inline_config.gsub("\\n","\n")
end
@conf = Fluent::Config.parse(config_data, config_fname, config_basedir, @use_v1_config)
Fluent::Config.parse(config_data, config_fname, config_basedir, @use_v1_config)
end

def set_system_config
@system_config = SystemConfig.create(@conf) # @conf is set in read_config
@system_config.attach(self)
@system_config.apply(self)
def build_system_config(conf)
system_config = SystemConfig.create(conf)
opt = {}
Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param|
if @cl_opt.key?(param) && !@cl_opt[param].nil?
if param == :log_level && opt[:log_level] == Fluent::Log::LEVEL_INFO
# info level can't be specified via command line option.
# log_level is info here, it is default value and <system>'s log_level should be applied if exists.
next
end

opt[param] = @cl_opt[param]
end
end
system_config.overwrite_variables(opt)
system_config
end

def change_privilege
ServerEngine::Privilege.change(@chuser, @chgroup)
end

def init_engine(supervisor: false)
def init_engine
Fluent::Engine.init(@system_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init_engine is now only one line. Can we replace init_engine with Fluent::Engine.init?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it a method because others such as Supervisor#change_privilege, Supervisor#run_configure, Supervisor#run_engine are the same situation.
Do you think it's better to change these methods at this time? (I don't have a strong opinion about it. so if you have any opinion, I follow yours)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So it should be in other patch for refactor methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll do it 👍


@libs.each {|lib|
require lib
}

@plugin_dirs.each {|dir|
if Dir.exist?(dir)
dir = File.expand_path(dir)
Fluent::Engine.add_plugin_dir(dir)
end
}

if supervisor
# plugins / configuration dumps
Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
$log.info("gem '#{spec.name}' version '#{spec.version}'")
end
end
end

def run_configure
Expand Down
Loading