From c1170cc0b83ec6c7875ee24abfe0b2dac46442f7 Mon Sep 17 00:00:00 2001 From: RittaNarita Date: Wed, 11 May 2016 01:44:30 +0900 Subject: [PATCH 1/5] Add ServerEngine --- fluentd.gemspec | 3 +- lib/fluent/command/fluentd.rb | 7 +- lib/fluent/daemon.rb | 15 + lib/fluent/log.rb | 50 ++- lib/fluent/plugin/socket_util.rb | 2 +- lib/fluent/supervisor.rb | 704 +++++++++++++------------------ 6 files changed, 369 insertions(+), 412 deletions(-) create mode 100644 lib/fluent/daemon.rb diff --git a/fluentd.gemspec b/fluentd.gemspec index 3bae4edc57..d585392892 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -22,7 +22,8 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("msgpack", [">= 0.7.0"]) gem.add_runtime_dependency("json", [">= 1.4.3"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) - gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"]) + gem.add_runtime_dependency("cool.io", [">= 1.4.3", "< 2.0.0"]) + gem.add_runtime_dependency("serverengine", [">= 1.6.3"]) gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"]) gem.add_runtime_dependency("sigdump", ["~> 0.2.2"]) gem.add_runtime_dependency("tzinfo", [">= 1.0.0"]) diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index 1be139838e..6f497607d2 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -260,4 +260,9 @@ exit 0 if early_exit require 'fluent/supervisor' -Fluent::Supervisor.new(opts).start +if opts[:supervise] + Fluent::Supervisor.new(opts).run_supervisor +else + Fluent::Supervisor.new(opts).run_worker +end + diff --git a/lib/fluent/daemon.rb b/lib/fluent/daemon.rb new file mode 100644 index 0000000000..055f4a2a35 --- /dev/null +++ b/lib/fluent/daemon.rb @@ -0,0 +1,15 @@ +#!/usr/bin/env ruby +# -*- coding: utf-8 -*- + +here = File.dirname(__FILE__) +$LOAD_PATH << File.expand_path(File.join(here, '..')) + +require 'serverengine' +require 'fluent/supervisor' + +server_module = Fluent.const_get(ARGV[0]) +worker_module = Fluent.const_get(ARGV[1]) +# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config. +config_path = ARGV[2] +params = JSON.parse(ARGV[3]) +ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) } diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb index e29e500c8e..5a22bec904 100644 --- a/lib/fluent/log.rb +++ b/lib/fluent/log.rb @@ -53,9 +53,34 @@ def self.str_to_level(log_level_str) end end - def initialize(out=STDERR, level=LEVEL_TRACE, opts={}) - @out = out - @level = level + def initialize(logger, opts={}) + # overwrites logger.level= so that config reloading resets level of Fluentd::Log + orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger) + me = self + # The original ruby logger sets the number as each log level like below. + # DEBUG = 0 + # INFO = 1 + # WARN = 2 + # ERROR = 3 + # FATAL = 4 + # Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level. + # TRACE = -1 + # + # On the other hand, in fluentd side, it sets the number like below. + # TRACE = 0 + # DEBUG = 1 + # INFO = 2 + # WARN = 3 + # ERROR = 4 + # FATAL = 5 + # + # Then fluentd's level is set as serverengine's level + 1. + # So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1. + logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 } + + @logger = logger + @out = logger.instance_variable_get(:@logdev) + @level = logger.level + 1 @debug_mode = false @self_event = false @tag = 'fluent' @@ -74,7 +99,10 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={}) end def dup - clone = self.class.new(@out, @level, suppress_repeated_stacktrace: @suppress_repeated_stacktrace) + dl_opts = {} + dl_opts[:log_level] = @level - 1 + logger = ServerEngine::DaemonLogger.new(@out, dl_opts) + clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace) clone.tag = @tag clone.time_format = @time_format # optional headers/attrs are not copied, because new PluginLogger should have another one of it @@ -87,6 +115,18 @@ def dup attr_accessor :time_format attr_accessor :optional_header, :optional_attrs + def logdev=(logdev) + @out = logdev + @logger.instance_variable_set(:@logdev, logdev) + nil + end + + def reopen! + # do noting in @logger.reopen! because it's already reopened in Supervisor.load_config + @logger.reopen! if @logger + nil + end + def enable_debug(b=true) @debug_mode = b self @@ -238,7 +278,7 @@ def fatal_backtrace(backtrace=$!.backtrace) end def puts(msg) - @out.puts(msg) + @logger << msg + "\n" @out.flush msg rescue diff --git a/lib/fluent/plugin/socket_util.rb b/lib/fluent/plugin/socket_util.rb index 83f2e00355..eb61874412 100644 --- a/lib/fluent/plugin/socket_util.rb +++ b/lib/fluent/plugin/socket_util.rb @@ -125,7 +125,7 @@ def start def shutdown @loop.watchers.each { |w| w.detach } - @loop.stop + @loop.stop if @loop.instance_variable_get("@running") @handler.close @thread.join end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 11a7004618..37546c62e3 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -24,6 +24,8 @@ require 'fluent/plugin' require 'fluent/rpc' require 'fluent/system_config' +require 'serverengine' +require 'shellwords' if Fluent.windows? require 'windows/library' @@ -35,21 +37,210 @@ end module Fluent - class Supervisor - def self.get_etc_passwd(user) - if user.to_i.to_s == user - Etc.getpwuid(user.to_i) - else - Etc.getpwnam(user) + module ServerModule + def before_run + @start_time = Time.now + + if config[:rpc_endpoint] + @rpc_endpoint = config[:rpc_endpoint] + @enable_get_dump = config[:enable_get_dump] + run_rpc_server end + install_supervisor_signal_handlers + + socket_manager_path = ServerEngine::SocketManager::Server.generate_path + ServerEngine::SocketManager::Server.open(socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s end - def self.get_etc_group(group) - if group.to_i.to_s == group - Etc.getgrgid(group.to_i) - else - Etc.getgrnam(group) + def after_run + if Time.now - @start_time < 1 + $log.warn "process died within 1 second. exit." + end + + stop_rpc_server if @rpc_endpoint + end + + def run_rpc_server + @rpc_server = RPC::Server.new(@rpc_endpoint, $log) + + # built-in RPC for signals + @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res| + $log.debug "fluentd RPC got /api/processes.interruptWorkers request" + Process.kill :INT, $$ + nil + } + @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res| + $log.debug "fluentd RPC got /api/processes.killWorkers request" + Process.kill :TERM, $$ + nil + } + @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| + $log.debug "fluentd RPC got /api/plugins.flushBuffers request" + unless Fluent.windows? + Process.kill :USR1, $$ + end + nil + } + @rpc_server.mount_proc('/api/config.reload') { |req, res| + $log.debug "fluentd RPC got /api/config.reload request" + if Fluent.windows? + # restart worker with auto restarting by killing + kill_worker + else + Process.kill :HUP, $$ + end + nil + } + @rpc_server.mount_proc('/api/config.dump') { |req, res| + $log.debug "fluentd RPC got /api/config.dump request" + $log.info "dump in-memory config" + supervisor_dump_config_handler + nil + } + + @rpc_server.mount_proc('/api/config.getDump') { |req, res| + $log.debug "fluentd RPC got /api/config.dump request" + $log.info "get dump in-memory config via HTTP" + res.body = supervisor_get_dump_config_handler + [nil, nil, res] + } if @enable_get_dump + + @rpc_server.start + end + + def stop_rpc_server + @rpc_server.shutdown + end + + def install_supervisor_signal_handlers + trap :USR1 do + $log.debug "fluentd supervisor process get SIGUSR1" + supervisor_sigusr1_handler + end unless Fluent.windows? + end + + def supervisor_sigusr1_handler + if log = config[:logger_initializer] + log.reopen! + end + + if pid = config[:worker_pid] + Process.kill(:USR1, pid) + # don't rescue Erro::ESRSH here (invalid status) + end + end + + def kill_worker + if pid = config[:worker_pid] + if Fluent.windows? + Process.kill :KILL, pid + else + Process.kill :INT, pid + end + end + end + + def supervisor_dump_config_handler + $log.info config[:fluentd_conf].to_s + end + + def supervisor_get_dump_config_handler + {conf: config[:fluentd_conf].to_s} + end + end + + module WorkerModule + def spawn(process_manager) + main_cmd = config[:main_cmd] + @pm = process_manager.spawn(main_cmd) + end + + def after_start + config[:worker_pid] = @pm.pid + end + end + + class Supervisor + def self.load_config(path, params = {}) + + pre_loadtime = 0 + pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime'] + pre_config_mtime = nil + pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime'] + config_mtime = File.mtime(path) + + # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed + if Time.now - Time.at(pre_loadtime) < 5 and config_mtime == pre_config_mtime + return params['pre_conf'] end + + config_fname = File.basename(path) + config_basedir = File.dirname(path) + config_data = File.read(path) + inline_config = params['inline_config'] + if inline_config == '-' + config_data << "\n" << STDIN.read + elsif inline_config + config_data << "\n" << inline_config.gsub("\\n","\n") + end + fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config']) + system_config = SystemConfig.create(fluentd_conf) + + log_level = system_config.log_level || params['log_level'] + suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace'] + log_path = params['log_path'] + chuser = params['chuser'] + chgroup = params['chgroup'] + rpc_endpoint = system_config.rpc_endpoint + enable_get_dump = system_config.enable_get_dump + + log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace} + logger_initializer = Supervisor::LoggerInitializer.new(log_path, log_level, chuser, chgroup, log_opts) + # this #init sets initialized logger to $log + logger_initializer.init + logger = $log + + daemonize = params.fetch('daemonize', false) + main_cmd = params['main_cmd'] + + se_config = { + worker_type: 'spawn', + workers: 1, + log_stdin: false, + log_stdout: false, + log_stderr: false, + enable_heartbeat: true, + auto_heartbeat: false, + logger: logger, + log: logger.out, + log_path: log_path, + log_level: log_level, + logger_initializer: logger_initializer, + chuser: chuser, + chgroup: chgroup, + suppress_repeated_stacktrace: suppress_repeated_stacktrace, + daemonize: daemonize, + rpc_endpoint: rpc_endpoint, + enable_get_dump: enable_get_dump, + windows_daemon_cmdline: [ServerEngine.ruby_bin_path, + File.join(File.dirname(__FILE__), 'daemon.rb'), + ServerModule.name, + WorkerModule.name, + path, + JSON.dump(params)], + fluentd_conf: fluentd_conf, + main_cmd: main_cmd, + } + pre_params = params.dup + params['pre_loadtime'] = Time.now.to_i + params['pre_config_mtime'] = config_mtime + params['pre_conf'] = se_config + # prevent pre_conf from being too big by reloading many times. + pre_params['pre_conf'] = nil + params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params) + + return se_config end class LoggerInitializer @@ -65,15 +256,19 @@ def init if @path && @path != "-" @io = File.open(@path, "a") if @chuser || @chgroup - chuid = @chuser ? Supervisor.get_etc_passwd(@chuser).uid : nil - chgid = @chgroup ? Supervisor.get_etc_group(@chgroup).gid : nil + chuid = @chuser ? ServerEngine::Daemon.get_etc_passwd(@chuser).uid : nil + chgid = @chgroup ? ServerEngine::Daemon.get_etc_group(@chgroup).gid : nil File.chown(chuid, chgid, @path) end else @io = STDOUT end - $log = Fluent::Log.new(@io, @level, @opts) + dl_opts = {} + # subtract 1 to match serverengine daemon logger side logging severity. + dl_opts[:log_level] = @level - 1 + logger = ServerEngine::DaemonLogger.new(@io, dl_opts) + $log = Fluent::Log.new(logger, @opts) $log.enable_color(false) if @path $log.enable_debug if @level <= Fluent::Log::LEVEL_DEBUG end @@ -138,60 +333,20 @@ def initialize(opt) @without_source = opt[:without_source] @signame = opt[:signame] - if Fluent.windows? - ruby_path = "\0" * 256 - GetModuleFileName.call(0,ruby_path,256) - ruby_path = ruby_path.rstrip.gsub(/\\/, '/') - @rubybin_dir = ruby_path[0, ruby_path.rindex("/")] - @winosvi = windows_version - end - log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace]} + @suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace] + log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace} @log = LoggerInitializer.new(@log_path, @log_level, @chuser, @chgroup, log_opts) @finished = false - @main_pid = nil end - def start + def run_supervisor @log.init show_plugin_config if @show_plugin_config read_config set_system_config dry_run if @dry_run - start_daemonize if @daemonize - setup_rpc_server if @rpc_endpoint - setup_rpc_get_dump if @enable_get_dump - - if @supervise - install_supervisor_signal_handlers - run_rpc_server if @rpc_endpoint - until @finished - supervise do - change_privilege - init_engine - install_main_process_signal_handlers - run_configure - finish_daemonize if @daemonize - run_engine - exit 0 - end - $log.error "fluentd main process died unexpectedly. restarting." unless @finished - end - else - $log.info "starting fluentd-#{Fluent::VERSION} without supervision" - run_rpc_server if @rpc_endpoint - main_process do - change_privilege - init_engine - install_main_process_signal_handlers - install_main_process_winsigint_handler if Fluent.windows? - run_configure - finish_daemonize if @daemonize - run_engine - exit 0 - end - end - stop_rpc_server if @rpc_endpoint + supervise end def options @@ -203,14 +358,33 @@ def options } end + def run_worker + @log.init + Process.setproctitle("worker:#{@process_name}") if @process_name + + show_plugin_config if @show_plugin_config + read_config + set_system_config + + install_main_process_signal_handlers + + $log.info "starting fluentd-#{Fluent::VERSION} without supervision" + + main_process do + change_privilege + init_engine + run_configure + run_engine + exit 0 + end + end + private def dry_run $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode" - change_privilege init_engine - install_main_process_signal_handlers run_configure exit 0 rescue => e @@ -243,156 +417,73 @@ def show_plugin_config exit 1 end - def start_daemonize - @wait_daemonize_pipe_r, @wait_daemonize_pipe_w = IO.pipe - - if fork - # console process - @wait_daemonize_pipe_w.close - @wait_daemonize_pipe_w = nil - wait_daemonize - exit 0 - end - - # daemonize intermediate process - @wait_daemonize_pipe_r.close - @wait_daemonize_pipe_r = nil - - # in case the child process forked during run_configure - @wait_daemonize_pipe_w.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - - Process.setsid - exit!(0) if fork - File.umask(0) - - # supervisor process - @supervisor_pid = Process.pid - end + def supervise + $log.info "starting fluentd-#{Fluent::VERSION}" - def wait_daemonize - supervisor_pid = @wait_daemonize_pipe_r.read - if supervisor_pid.empty? - # initialization failed - exit! 1 + if Fluent.windows? + fluentd_spawn_cmd = ServerEngine.ruby_bin_path + ' "' + $0.gsub('"', '""') + '" ' + $fluentdargv.each{|a| + fluentd_spawn_cmd << ('"' + a.gsub('"', '""') + '" ') + } + else + fluentd_spawn_cmd = $0.shellescape + ' ' + $fluentdargv.each{|a| + fluentd_spawn_cmd << (a.shellescape + " ") + } end - @wait_daemonize_pipe_r.close - @wait_daemonize_pipe_r = nil - - # write pid file - File.open(@daemonize, "w") {|f| - f.write supervisor_pid + fluentd_spawn_cmd << ("--no-supervisor") + $log.info "spawn command to main: " + fluentd_spawn_cmd + + params = {} + params['main_cmd'] = fluentd_spawn_cmd + params['daemonize'] = @daemonize + params['inline_config'] = @inline_config + params['log_path'] = @log_path + params['log_level'] = @log_level + params['chuser'] = @chuser + params['chgroup'] = @chgroup + params['use_v1_config'] = @use_v1_config + params['suppress_repeated_stacktrace'] = @suppress_repeated_stacktrace + + se = ServerEngine.create(ServerModule, WorkerModule){ + Fluent::Supervisor.load_config(@config_path, params) } + se.run end - def finish_daemonize - if @wait_daemonize_pipe_w - STDIN.reopen("/dev/null") - STDOUT.reopen("/dev/null", "w") - STDERR.reopen("/dev/null", "w") - @wait_daemonize_pipe_w.write @supervisor_pid.to_s - @wait_daemonize_pipe_w.close - @wait_daemonize_pipe_w = nil + def install_main_process_signal_handlers + # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group. + # Then serverengine can't handle signal, so need to handle it in this process. + trap :INT do + $log.debug "fluentd main process get SIGINT" + unless @finished + @finished = true + $log.debug "getting start to shutdown main process" + Fluent::Engine.stop + end end - end - - def setup_rpc_server - @rpc_server = RPC::Server.new(@rpc_endpoint, $log) - - # built-in RPC for signals - @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res| - $log.debug "fluentd RPC got /api/processes.interruptWorkers request" - supervisor_sigint_handler - nil - } - @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res| - $log.debug "fluentd RPC got /api/processes.killWorkers request" - supervisor_sigterm_handler - nil - } - @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| - $log.debug "fluentd RPC got /api/plugins.flushBuffers request" - supervisor_sigusr1_handler - nil - } - @rpc_server.mount_proc('/api/config.reload') { |req, res| - $log.debug "fluentd RPC got /api/config.reload request" - $log.info "restarting" - supervisor_sighup_handler - nil - } - @rpc_server.mount_proc('/api/config.dump') { |req, res| - $log.debug "fluentd RPC got /api/config.dump request" - $log.info "dump in-memory config" - supervisor_dump_config_handler - nil - } - end - - def setup_rpc_get_dump - @rpc_server.mount_proc('/api/config.getDump') { |req, res| - $log.debug "fluentd RPC got /api/config.dump request" - $log.info "get dump in-memory config via HTTP" - res.body = supervisor_get_dump_config_handler - [nil, nil, res] - } - end - - def run_rpc_server - @rpc_server.start - end - def stop_rpc_server - @rpc_server.shutdown + trap :USR1 do + flush_buffer + end unless Fluent.windows? end - def supervise(&block) - start_time = Time.now - - Process.setproctitle("supervisor:#{@process_name}") if @process_name - $log.info "starting fluentd-#{Fluent::VERSION}" + def flush_buffer + $log.debug "fluentd main process get SIGUSR1" + $log.info "force flushing buffered events" + @log.reopen! - if !Fluent.windows? - @main_pid = fork do - main_process(&block) - end - else - if @supervise - fluentd_spawn_cmd = @rubybin_dir+"/ruby.exe '"+@rubybin_dir+"/fluentd' " - $fluentdargv.each{|a| - fluentd_spawn_cmd << (a + " ") - } - fluentd_spawn_cmd << ("--no-supervisor") - $log.info "spawn command to main (windows) : " + fluentd_spawn_cmd - @main_pid = Process.spawn(fluentd_spawn_cmd) - else - main_process(&block) + # Creating new thread due to mutex can't lock + # in main thread during trap context + Thread.new { + begin + Fluent::Engine.flush! + $log.debug "flushing thread: flushed" + rescue Exception => e + $log.warn "flushing thread error: #{e}" end - end - - if @daemonize && @wait_daemonize_pipe_w - STDIN.reopen("/dev/null") - STDOUT.reopen("/dev/null", "w") - STDERR.reopen("/dev/null", "w") - @wait_daemonize_pipe_w.close - @wait_daemonize_pipe_w = nil - end - - Process.waitpid(@main_pid) - @main_pid = nil - ecode = $?.to_i - - if Fluent.windows? - @th_sv.kill if @th_sv.alive? - @th_sv.join rescue nil - end - - $log.info "process finished", code: ecode - - if !@finished && Time.now - start_time < 1 - $log.warn "process died within 1 second. exit." - exit ecode - end + }.run end def main_process(&block) @@ -400,15 +491,14 @@ def main_process(&block) begin block.call - if Fluent.windows? - @th_ma.join - end - rescue Fluent::ConfigError $log.error "config error", file: @config_path, error: $!.to_s $log.debug_backtrace unless @log.stdout? - console = Fluent::Log.new(STDOUT, @log_level).enable_debug + logger = ServerEngine::DaemonLogger.new(STDOUT) + log = Fluent::Log.new(logger) + log.level = @log_level + console = log.enable_debug console.error "config error", file: @config_path, error: $!.to_s console.debug_backtrace end @@ -417,7 +507,10 @@ def main_process(&block) $log.error "unexpected error", error: $!.to_s $log.error_backtrace unless @log.stdout? - console = Fluent::Log.new(STDOUT, @log_level).enable_debug + logger = ServerEngine::DaemonLogger.new(STDOUT) + log = Fluent::Log.new(logger) + log.level = @log_level + console = log.enable_debug console.error "unexpected error", error: $!.to_s console.error_backtrace end @@ -426,96 +519,6 @@ def main_process(&block) exit! 1 end - def install_supervisor_signal_handlers - install_supervisor_winsigint_handler - - trap :INT do - $log.debug "fluentd supervisor process get SIGINT" - supervisor_sigint_handler - end - - trap :TERM do - $log.debug "fluentd supervisor process get SIGTERM" - supervisor_sigterm_handler - end - - trap :HUP do - $log.debug "fluentd supervisor process get SIGHUP" - $log.info "restarting" - supervisor_sighup_handler - end unless Fluent.windows? - - trap :USR1 do - $log.debug "fluentd supervisor process get SIGUSR1" - supervisor_sigusr1_handler - end unless Fluent.windows? - end - - def supervisor_sigint_handler - @finished = true - unless Fluent.windows? - if pid = @main_pid - # kill processes only still exists - unless Process.waitpid(pid, Process::WNOHANG) - begin - Process.kill(:INT, pid) - rescue Errno::ESRCH - # ignore processes already died - end - end - end - else - begin - @evtend.set - rescue - # nothing to do. - end - end - end - - def supervisor_sigterm_handler - @finished = true - if pid = @main_pid - # kill processes only still exists - unless Process.waitpid(pid, Process::WNOHANG) - begin - Process.kill(:TERM, pid) - rescue Errno::ESRCH - # ignore processes already died - end - end - end - end - - def supervisor_sighup_handler - # Creating new thread due to mutex can't lock - # in main thread during trap context - Thread.new { - read_config - set_system_config - if pid = @main_pid - Process.kill(:TERM, pid) - # don't resuce Erro::ESRSH here (invalid status) - end - }.run - end - - def supervisor_sigusr1_handler - @log.reopen! - if pid = @main_pid - Process.kill(:USR1, pid) - # don't resuce Erro::ESRSH here (invalid status) - end - end - - def supervisor_dump_config_handler - $log.info @conf.to_s - end - - def supervisor_get_dump_config_handler - {conf: @conf.to_s} - end - def read_config $log.info "reading config file", path: @config_path @config_fname = File.basename(@config_path) @@ -534,25 +537,8 @@ def set_system_config @system_config.apply(self) end - def run_configure - Fluent::Engine.run_configure(@conf) - end - def change_privilege - if @chgroup - etc_group = Supervisor.get_etc_group(@chgroup) - Process::GID.change_privilege(etc_group.gid) - end - - if @chuser - etc_pw = Supervisor.get_etc_passwd(@chuser) - user_groups = [etc_pw.gid] - Etc.setgrent - Etc.group { |gr| user_groups << gr.gid if gr.mem.include?(etc_pw.name) } # emulate 'id -G' - - Process.groups = Process.groups | user_groups - Process::UID.change_privilege(etc_pw.uid) - end + ServerEngine::Daemon.change_privilege(@chuser, @chgroup) end def init_engine @@ -570,102 +556,12 @@ def init_engine } end - def install_main_process_signal_handlers - # Strictly speaking, these signal handling is not thread safe. - # But enough safe to limit twice call of Fluent::Engine.stop. - - trap :INT do - $log.debug "fluentd main process get SIGINT" - unless Fluent.windows? - unless @finished - @finished = true - $log.debug "getting start to shutdown main process" - Fluent::Engine.stop - end - else - begin - @evtend.set - rescue - # nothing to do. - end - end - end - - trap :TERM do - $log.debug "fluentd main process get SIGTERM" - unless @finished - @finished = true - $log.debug "getting start to shutdown main process" - Fluent::Engine.stop - end - end - - trap :HUP do - # TODO - $log.debug "fluentd main process get SIGHUP" - end unless Fluent.windows? - - trap :USR1 do - $log.debug "fluentd main process get SIGUSR1" - $log.info "force flushing buffered events" - @log.reopen! - - # Creating new thread due to mutex can't lock - # in main thread during trap context - Thread.new { - begin - Fluent::Engine.flush! - $log.debug "flushing thread: flushed" - rescue Exception => e - $log.warn "flushing thread error: #{e}" - end - }.run - end unless Fluent.windows? + def run_configure + Fluent::Engine.run_configure(@conf) end def run_engine Fluent::Engine.run end - - def install_supervisor_winsigint_handler - @winintname = @signame || "fluentdwinsigint_#{Process.pid}" - @th_sv = Thread.new do - @evtend = Win32::Event.new(@winintname, true) - until @evtend.signaled? - sleep(1) - end - @evtend.close - - @finished = true - if pid = @main_pid - unless Process.waitpid(pid, Process::WNOHANG) - sigx = (@winosvi >= 6.2) ? (:INT) : (:KILL) - begin - Process.kill(sigx, pid) - rescue Errno::ESRCH - # ignore processes already died - end - end - end - end - end - - def install_main_process_winsigint_handler - @winintname = @signame || "fluentdwinsigint_#{Process.ppid}" - @th_ma = Thread.new do - @evtend = Win32::Event.open(@winintname) - until @evtend.signaled? - sleep(1) - end - @evtend.close - - unless @finished - @finished = true - $log.debug "getting start to shutdown main process" - Fluent::Engine.stop - end - end - $log.debug "install_main_process_winsigint_handler***** installed main winsiginthandler" - end end end From 83c711e403d60de37f765a792c30450b1ef981b2 Mon Sep 17 00:00:00 2001 From: RittaNarita Date: Wed, 11 May 2016 01:47:13 +0900 Subject: [PATCH 2/5] Use Socket Manager for input plugins --- lib/fluent/plugin/in_forward.rb | 16 +++++++++++----- lib/fluent/plugin/in_http.rb | 8 +++++++- lib/fluent/plugin/in_syslog.rb | 11 ++++++++--- lib/fluent/plugin/in_tcp.rb | 9 ++++++++- lib/fluent/plugin/in_udp.rb | 8 ++++++-- test/plugin/test_in_forward.rb | 12 ++++++++++++ test/plugin/test_in_http.rb | 12 ++++++++++++ test/plugin/test_in_syslog.rb | 12 ++++++++++++ test/plugin/test_in_tcp.rb | 12 ++++++++++++ test/plugin/test_in_udp.rb | 15 +++++++++++++++ 10 files changed, 103 insertions(+), 12 deletions(-) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 2fbd52c271..221d03d5a4 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -57,11 +57,16 @@ def configure(conf) def start @loop = Coolio::Loop.new - @lsock = listen + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + client = ServerEngine::SocketManager::Client.new(socket_manager_path) + + @lsock = listen(client) @loop.attach(@lsock) - @usock = SocketUtil.create_udp_socket(@bind) - @usock.bind(@bind, @port) + @usock = client.listen_udp(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) @loop.attach(@hbr) @@ -86,9 +91,10 @@ def shutdown @lsock.close end - def listen + def listen(client) log.info "listening fluent socket on #{@bind}:#{@port}" - s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message)) + sock = client.listen_tcp(@bind, @port) + s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message)) s.listen(@backlog) unless @backlog.nil? s end diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 5f50603587..dc33f11d4f 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -101,7 +101,13 @@ def on_timer def start log.debug "listening http on #{@bind}:#{@port}" - lsock = TCPServer.new(@bind, @port) + + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + client = ServerEngine::SocketManager::Client.new(socket_manager_path) + lsock = client.listen_tcp(@bind, @port) detach_multi_process do super diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index 206c8fbe4a..24e1b96f59 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -182,13 +182,18 @@ def receive_data(data, addr) def listen(callback) log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}" + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + client = ServerEngine::SocketManager::Client.new(socket_manager_path) if @protocol_type == :udp - @usock = SocketUtil.create_udp_socket(@bind) - @usock.bind(@bind, @port) + @usock = client.listen_udp(@bind, @port) SocketUtil::UdpHandler.new(@usock, log, 2048, callback) else # syslog family add "\n" to each message and this seems only way to split messages in tcp stream - Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback) + lsock = client.listen_tcp(@bind, @port) + Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, "\n", callback) end end diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index cfac648bf9..addfc604c4 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -28,7 +28,14 @@ class TcpInput < SocketUtil::BaseInput def listen(callback) log.info "listening tcp socket on #{@bind}:#{@port}" - Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, @delimiter, callback) + + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + client = ServerEngine::SocketManager::Client.new(socket_manager_path) + lsock = client.listen_tcp(@bind, @port) + Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback) end end end diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb index 5fe9dd90e8..14f08fc574 100644 --- a/lib/fluent/plugin/in_udp.rb +++ b/lib/fluent/plugin/in_udp.rb @@ -25,8 +25,12 @@ class UdpInput < SocketUtil::BaseInput def listen(callback) log.info "listening udp socket on #{@bind}:#{@port}" - @usock = SocketUtil.create_udp_socket(@bind) - @usock.bind(@bind, @port) + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] + if Fluent.windows? + socket_manager_path = socket_manager_path.to_i + end + client = ServerEngine::SocketManager::Client.new(socket_manager_path) + @usock = client.listen_udp(@bind, @port) SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback) end end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index fcf8c9a50d..3a940e856e 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -7,6 +7,18 @@ require 'fluent/plugin/in_forward' class ForwardInputTest < Test::Unit::TestCase + class << self + def startup + socket_manager_path = ServerEngine::SocketManager::Server.generate_path + @server = ServerEngine::SocketManager::Server.open(socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s + end + + def shutdown + @server.close + end + end + def setup Fluent::Test.setup @responses = [] # for testing responses after sending data diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index b0ec859643..7e7265893f 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -4,6 +4,18 @@ require 'net/http' class HttpInputTest < Test::Unit::TestCase + class << self + def startup + socket_manager_path = ServerEngine::SocketManager::Server.generate_path + @server = ServerEngine::SocketManager::Server.open(socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s + end + + def shutdown + @server.close + end + end + def setup Fluent::Test.setup end diff --git a/test/plugin/test_in_syslog.rb b/test/plugin/test_in_syslog.rb index 643add36f1..95f761622d 100755 --- a/test/plugin/test_in_syslog.rb +++ b/test/plugin/test_in_syslog.rb @@ -3,6 +3,18 @@ require 'fluent/plugin/in_syslog' class SyslogInputTest < Test::Unit::TestCase + class << self + def startup + socket_manager_path = ServerEngine::SocketManager::Server.generate_path + @server = ServerEngine::SocketManager::Server.open(socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s + end + + def shutdown + @server.close + end + end + def setup Fluent::Test.setup require 'fluent/plugin/socket_util' diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index a5cbfb79e7..c6416a1672 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -3,6 +3,18 @@ require 'fluent/plugin/in_tcp' class TcpInputTest < Test::Unit::TestCase + class << self + def startup + socket_manager_path = ServerEngine::SocketManager::Server.generate_path + @server = ServerEngine::SocketManager::Server.open(socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s + end + + def shutdown + @server.close + end + end + def setup Fluent::Test.setup end diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index 3d8f60db32..01c47c1b8a 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -3,6 +3,18 @@ require 'fluent/plugin/in_udp' class UdpInputTest < Test::Unit::TestCase + class << self + def startup + socket_manager_path = ServerEngine::SocketManager::Server.generate_path + @server = ServerEngine::SocketManager::Server.open(socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s + end + + def shutdown + @server.close + end + end + def setup Fluent::Test.setup end @@ -55,6 +67,7 @@ def test_time_format tests.each {|test| u.send(test['msg'], 0) } + u.close sleep 1 end @@ -63,6 +76,7 @@ def test_time_format assert_equal_event_time(tests[i]['expected'], emits[i][1]) } } + end { @@ -89,6 +103,7 @@ def test_time_format tests.each { |test| u.send(test['msg'], 0) } + u.close sleep 1 end From 31c00d02798b5d5a689af190222c63027ca0f4ba Mon Sep 17 00:00:00 2001 From: RittaNarita Date: Wed, 11 May 2016 01:51:44 +0900 Subject: [PATCH 3/5] Fix logger to use Damon Logger --- lib/fluent/command/debug.rb | 5 ++++- lib/fluent/test.rb | 7 ++++++- lib/fluent/test/base.rb | 7 ++++++- test/helper.rb | 7 ++++++- test/test_config.rb | 17 +++++++++++++---- 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/lib/fluent/command/debug.rb b/lib/fluent/command/debug.rb index 254d6b59f0..a5bc6f54ea 100644 --- a/lib/fluent/command/debug.rb +++ b/lib/fluent/command/debug.rb @@ -66,7 +66,10 @@ include Fluent::SystemConfig::Mixin -$log = Fluent::Log.new(STDERR, Fluent::Log::LEVEL_TRACE) +dl_opts = {} +dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE +logger = ServerEngine::DaemonLogger.new(STDERR, dl_opts) +$log = Fluent::Log.new(logger) Fluent::Engine.init(system_config) DRb::DRbObject.class_eval do diff --git a/lib/fluent/test.rb b/lib/fluent/test.rb index d34b4442f7..01f6ecbdd5 100644 --- a/lib/fluent/test.rb +++ b/lib/fluent/test.rb @@ -22,5 +22,10 @@ require 'fluent/test/filter_test' require 'fluent/test/parser_test' require 'fluent/test/formatter_test' +require 'serverengine' -$log ||= Fluent::Log.new(Fluent::Test::DummyLogDevice.new) +dl_opts = {} +dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO +logdev = Fluent::Test::DummyLogDevice.new +logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) +$log ||= Fluent::Log.new(logger) \ No newline at end of file diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index ae3cce27a8..57305538c9 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -17,6 +17,7 @@ require 'fluent/engine' require 'fluent/system_config' require 'fluent/config' +require 'serverengine' module Fluent module Test @@ -119,7 +120,11 @@ def close class TestLogger < Fluent::PluginLogger def initialize @logdev = DummyLogDevice.new - super(Fluent::Log.new(@logdev)) + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO + logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts) + log = Fluent::Log.new(logger) + super(log) end def reset diff --git a/test/helper.rb b/test/helper.rb index 47b0f32aca..c132203a4d 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -47,6 +47,7 @@ def to_masked_element require 'fluent/plugin_id' require 'fluent/plugin_helper' require 'fluent/time' +require 'serverengine' module Fluent module Plugin @@ -104,4 +105,8 @@ def ipv6_enabled? end end -$log = Fluent::Log.new(Fluent::Test::DummyLogDevice.new, Fluent::Log::LEVEL_WARN) +dl_opts = {} +dl_opts[:log_level] = ServerEngine::DaemonLogger::WARN +logdev = Fluent::Test::DummyLogDevice.new +logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) +$log ||= Fluent::Log.new(logger) diff --git a/test/test_config.rb b/test/test_config.rb index e245519203..c9a93eeeb4 100644 --- a/test/test_config.rb +++ b/test/test_config.rb @@ -157,14 +157,23 @@ def write_config(path, data) end def test_inline - prepare_config + prepare_config opts = { - config_path: "#{TMP_DIR}/config_test_1.conf", - inline_config: "\n type http\n port 2222\n " + :config_path => "#{TMP_DIR}/config_test_1.conf", + :inline_config => "\n type http\n port 2222\n " } assert_nothing_raised do Fluent::Supervisor.new(opts) - end + end + create_warn_dummy_logger + end + + def create_warn_dummy_logger + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::WARN + logdev = Fluent::Test::DummyLogDevice.new + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + $log = Fluent::Log.new(logger) end end From 7ba4a11d0028e08e65b352a351ba566ba360d49c Mon Sep 17 00:00:00 2001 From: RittaNarita Date: Wed, 11 May 2016 01:52:56 +0900 Subject: [PATCH 4/5] Fix test_log to test Daemon Logger --- test/test_log.rb | 213 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 204 insertions(+), 9 deletions(-) diff --git a/test/test_log.rb b/test/test_log.rb index 7218d61057..f368e6464b 100644 --- a/test/test_log.rb +++ b/test/test_log.rb @@ -26,7 +26,43 @@ def teardown ) def test_output(data) log_level, start = data - log = Fluent::Log.new(@log_device, log_level) + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev) + log = Fluent::Log.new(logger) + log.level = log_level + log.trace "trace log" + log.debug "debug log" + log.info "info log" + log.warn "warn log" + log.error "error log" + log.fatal "fatal log" + expected = [ + "#{@timestamp_str} [trace]: trace log\n", + "#{@timestamp_str} [debug]: debug log\n", + "#{@timestamp_str} [info]: info log\n", + "#{@timestamp_str} [warn]: warn log\n", + "#{@timestamp_str} [error]: error log\n", + "#{@timestamp_str} [fatal]: fatal log\n" + ][start..-1] + assert_equal(expected, log.out.logs) + end + + data( + trace: [ServerEngine::DaemonLogger::TRACE, 0], + debug: [ServerEngine::DaemonLogger::DEBUG, 1], + info: [ServerEngine::DaemonLogger::INFO, 2], + warn: [ServerEngine::DaemonLogger::WARN, 3], + error: [ServerEngine::DaemonLogger::ERROR, 4], + fatal: [ServerEngine::DaemonLogger::FATAL, 5], + ) + def test_output_with_serverengine_loglevel(data) + log_level, start = data + + dl_opts = {} + dl_opts[:log_level] = log_level + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log = Fluent::Log.new(logger) log.trace "trace log" log.debug "debug log" log.info "info log" @@ -54,7 +90,44 @@ def test_output(data) ) def test_output_with_block(data) log_level, start = data - log = Fluent::Log.new(@log_device, log_level) + + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev) + log = Fluent::Log.new(logger) + log.level = log_level + log.trace { "trace log" } + log.debug { "debug log" } + log.info { "info log" } + log.warn { "warn log" } + log.error { "error log" } + log.fatal { "fatal log" } + expected = [ + "#{@timestamp_str} [trace]: trace log\n", + "#{@timestamp_str} [debug]: debug log\n", + "#{@timestamp_str} [info]: info log\n", + "#{@timestamp_str} [warn]: warn log\n", + "#{@timestamp_str} [error]: error log\n", + "#{@timestamp_str} [fatal]: fatal log\n" + ][start..-1] + assert_equal(expected, log.out.logs) + end + + data( + trace: [ServerEngine::DaemonLogger::TRACE, 0], + debug: [ServerEngine::DaemonLogger::DEBUG, 1], + info: [ServerEngine::DaemonLogger::INFO, 2], + warn: [ServerEngine::DaemonLogger::WARN, 3], + error: [ServerEngine::DaemonLogger::ERROR, 4], + fatal: [ServerEngine::DaemonLogger::FATAL, 5], + ) + def test_output_with_block_with_serverengine_loglevel(data) + log_level, start = data + + dl_opts = {} + dl_opts[:log_level] = log_level + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log = Fluent::Log.new(logger) log.trace { "trace log" } log.debug { "debug log" } log.info { "info log" } @@ -82,7 +155,42 @@ def test_output_with_block(data) ) def test_execute_block(data) log_level, expected = data - log = Fluent::Log.new(@log_device, log_level) + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev) + log = Fluent::Log.new(logger) + log.level = log_level + block_called = { + trace: false, + debug: false, + info: false, + warn: false, + error: false, + fatal: false, + } + log.trace { block_called[:trace] = true } + log.debug { block_called[:debug] = true } + log.info { block_called[:info] = true } + log.warn { block_called[:warn] = true } + log.error { block_called[:error] = true } + log.fatal { block_called[:fatal] = true } + assert_equal(expected, block_called) + end + + data( + trace: [ServerEngine::DaemonLogger::TRACE, { trace: true, debug: true, info: true, warn: true, error: true, fatal: true }], + debug: [ServerEngine::DaemonLogger::DEBUG, { trace: false, debug: true, info: true, warn: true, error: true, fatal: true }], + info: [ServerEngine::DaemonLogger::INFO, { trace: false, debug: false, info: true, warn: true, error: true, fatal: true }], + warn: [ServerEngine::DaemonLogger::WARN, { trace: false, debug: false, info: false, warn: true, error: true, fatal: true }], + error: [ServerEngine::DaemonLogger::ERROR, { trace: false, debug: false, info: false, warn: false, error: true, fatal: true }], + fatal: [ServerEngine::DaemonLogger::FATAL, { trace: false, debug: false, info: false, warn: false, error: false, fatal: true }], + ) + def test_execute_block_with_serverengine_loglevel(data) + log_level, expected = data + dl_opts = {} + dl_opts[:log_level] = log_level + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log = Fluent::Log.new(logger) block_called = { trace: false, debug: false, @@ -111,7 +219,55 @@ def test_execute_block(data) def test_backtrace(data) log_level, start = data backtrace = ["line 1", "line 2", "line 3"] - log = Fluent::Log.new(@log_device, log_level) + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev) + log = Fluent::Log.new(logger) + log.level = log_level + log.trace_backtrace(backtrace) + log.debug_backtrace(backtrace) + log.info_backtrace(backtrace) + log.warn_backtrace(backtrace) + log.error_backtrace(backtrace) + log.fatal_backtrace(backtrace) + expected = [ + " #{@timestamp_str} [trace]: line 1\n", + " #{@timestamp_str} [trace]: line 2\n", + " #{@timestamp_str} [trace]: line 3\n", + " #{@timestamp_str} [debug]: line 1\n", + " #{@timestamp_str} [debug]: line 2\n", + " #{@timestamp_str} [debug]: line 3\n", + " #{@timestamp_str} [info]: line 1\n", + " #{@timestamp_str} [info]: line 2\n", + " #{@timestamp_str} [info]: line 3\n", + " #{@timestamp_str} [warn]: line 1\n", + " #{@timestamp_str} [warn]: line 2\n", + " #{@timestamp_str} [warn]: line 3\n", + " #{@timestamp_str} [error]: line 1\n", + " #{@timestamp_str} [error]: line 2\n", + " #{@timestamp_str} [error]: line 3\n", + " #{@timestamp_str} [fatal]: line 1\n", + " #{@timestamp_str} [fatal]: line 2\n", + " #{@timestamp_str} [fatal]: line 3\n" + ][start..-1] + assert_equal(expected, log.out.logs) + end + + data( + trace: [ServerEngine::DaemonLogger::TRACE, 0], + debug: [ServerEngine::DaemonLogger::DEBUG, 3], + info: [ServerEngine::DaemonLogger::INFO, 6], + warn: [ServerEngine::DaemonLogger::WARN, 9], + error: [ServerEngine::DaemonLogger::ERROR, 12], + fatal: [ServerEngine::DaemonLogger::FATAL, 15], + ) + def test_backtrace_with_serverengine_loglevel(data) + log_level, start = data + backtrace = ["line 1", "line 2", "line 3"] + dl_opts = {} + dl_opts[:log_level] = log_level + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log = Fluent::Log.new(logger) log.trace_backtrace(backtrace) log.debug_backtrace(backtrace) log.info_backtrace(backtrace) @@ -145,7 +301,13 @@ def test_backtrace(data) sub_test_case "suppress repeated backtrace" do def test_same_log_level backtrace = ["line 1", "line 2", "line 3"] - log = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE, suppress_repeated_stacktrace: true) + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + opts = {} + opts[:suppress_repeated_stacktrace] = true + log = Fluent::Log.new(logger, opts) log.trace_backtrace(backtrace) log.trace_backtrace(backtrace) log.trace_backtrace(backtrace + ["line 4"]) @@ -170,7 +332,13 @@ def test_same_log_level def test_different_log_level backtrace = ["line 1", "line 2", "line 3"] - log = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE, suppress_repeated_stacktrace: true) + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + opts = {} + opts[:suppress_repeated_stacktrace] = true + log = Fluent::Log.new(logger, opts) log.trace_backtrace(backtrace) log.debug_backtrace(backtrace) log.info_backtrace(backtrace) @@ -192,7 +360,11 @@ def test_different_log_level end def test_dup - log1 = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE) + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log1 = Fluent::Log.new(logger) log2 = log1.dup log1.level = Fluent::Log::LEVEL_DEBUG original_tag = log1.tag @@ -204,13 +376,32 @@ def test_dup end def test_disable_events - log = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE) + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log = Fluent::Log.new(logger) engine = log.instance_variable_get("@engine") mock(engine).push_log_event(anything, anything, anything).once log.trace "trace log" log.disable_events(Thread.current) log.trace "trace log" end + + def test_level_reload + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + log = Fluent::Log.new(logger) + assert_equal(ServerEngine::DaemonLogger::TRACE, logger.level) + assert_equal(Fluent::Log::LEVEL_TRACE, log.level) + # change daemon logger side level + logger.level = ServerEngine::DaemonLogger::DEBUG + assert_equal(ServerEngine::DaemonLogger::DEBUG, logger.level) + # check fluentd log side level is also changed + assert_equal(Fluent::Log::LEVEL_DEBUG, log.level) + end end class PluginLoggerTest < Test::Unit::TestCase @@ -219,7 +410,11 @@ def setup @timestamp = Time.parse("2016-04-21 11:58:41 +0900") @timestamp_str = @timestamp.strftime("%Y-%m-%d %H:%M:%S %z") stub(Time).now { @timestamp } - @logger = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE) + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE + logdev = @log_device + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + @logger = Fluent::Log.new(logger) end def teardown From 87ef5f3806b6e1c75fa8026bc3a6fe2d5cc44379 Mon Sep 17 00:00:00 2001 From: RittaNarita Date: Wed, 11 May 2016 01:53:30 +0900 Subject: [PATCH 5/5] Add test for supervisor --- test/test_supervisor.rb | 267 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 test/test_supervisor.rb diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb new file mode 100644 index 0000000000..e0f9bdb902 --- /dev/null +++ b/test/test_supervisor.rb @@ -0,0 +1,267 @@ +require_relative 'helper' +require 'fluent/event_router' +require 'fluent/system_config' +require 'fluent/supervisor' +require_relative 'test_plugin_classes' + +require 'net/http' +require 'uri' + +class SupervisorTest < ::Test::Unit::TestCase + include Fluent + include FluentTest + include ServerModule + include WorkerModule + + TMP_DIR = File.dirname(__FILE__) + "/tmp/config#{ENV['TEST_ENV_NUMBER']}" + + def write_config(path, data) + FileUtils.mkdir_p(File.dirname(path)) + File.open(path, "w") {|f| f.write data } + end + + def test_initialize + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + opts.each { |k, v| + assert_equal v, sv.instance_variable_get("@#{k}") + } + end + + def test_read_config + create_info_dummy_logger + + tmp_dir = "#{TMP_DIR}/dir/test_read_config.conf" + conf_str = %[ + + @type forward + @id forward_input + + + @type stdout + @id stdout_output + +] + write_config tmp_dir, conf_str + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + + use_v1_config = {} + use_v1_config['use_v1_config'] = true + + sv.instance_variable_set(:@config_path, tmp_dir) + sv.instance_variable_set(:@use_v1_config, use_v1_config) + sv.send(:read_config) + + conf = sv.instance_variable_get(:@conf) + + elem = conf.elements.find { |e| e.name == 'source' } + assert_equal elem['@type'], "forward" + assert_equal elem['@id'], "forward_input" + + elem = conf.elements.find { |e| e.name == 'match' } + assert_equal elem.arg, "debug.**" + assert_equal elem['@type'], "stdout" + assert_equal elem['@id'], "stdout_output" + + $log.out.reset + end + + def test_system_config + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + conf_data = <<-EOC + + rpc_endpoint 127.0.0.1:24445 + suppress_repeated_stacktrace true + suppress_config_dump true + without_source true + enable_get_dump true + process_name "process_name" + log_level info + + EOC + conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) + sv.instance_variable_set(:@conf, conf) + sv.send(:set_system_config) + sys_conf = sv.instance_variable_get(:@system_config) + + assert_equal sys_conf.rpc_endpoint, '127.0.0.1:24445' + assert_equal sys_conf.suppress_repeated_stacktrace, true + assert_equal sys_conf.suppress_config_dump, true + assert_equal sys_conf.without_source, true + assert_equal sys_conf.enable_get_dump, true + assert_equal sys_conf.process_name, "process_name" + assert_equal sys_conf.log_level, 2 + end + + def test_main_process_signal_handlers + create_info_dummy_logger + + unless Fluent.windows? + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + sv.send(:install_main_process_signal_handlers) + + begin + Process.kill :USR1, $$ + rescue + end + + sleep 1 + + info_msg = '[info]: force flushing buffered events' + "\n" + assert{ $log.out.logs.first.end_with?(info_msg) } + end + + $log.out.reset + end + + def test_supervisor_signal_handler + create_debug_dummy_logger + + unless Fluent.windows? + + install_supervisor_signal_handlers + begin + Process.kill :USR1, $$ + rescue + end + + sleep 1 + + debug_msg = '[debug]: fluentd supervisor process get SIGUSR1' + "\n" + assert{ $log.out.logs.first.end_with?(debug_msg) } + end + + $log.out.reset + end + + def test_rpc_server + create_info_dummy_logger + + unless Fluent.windows? + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + conf_data = <<-EOC + + rpc_endpoint 0.0.0.0:24447 + + EOC + conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) + sv.instance_variable_set(:@conf, conf) + sv.send(:set_system_config) + sys_conf = sv.instance_variable_get(:@system_config) + @rpc_endpoint = sys_conf.rpc_endpoint + @enable_get_dump = sys_conf.enable_get_dump + + run_rpc_server + + sv.send(:install_main_process_signal_handlers) + Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers') + info_msg = '[info]: force flushing buffered events' + "\n" + + stop_rpc_server + + # In TravisCI with OSX(Xcode), it seems that can't use rpc server. + # This test will be passed in such environment. + pend unless $log.out.logs.first + + assert{ $log.out.logs.first.end_with?(info_msg) } + end + + $log.out.reset + end + + def test_load_config + tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" + conf_info_str = %[ + + log_level info + +] + conf_debug_str = %[ + + log_level debug + +] + write_config tmp_dir, conf_info_str + + params = {} + params['use_v1_config'] = true + params['log_path'] = 'log_path' + params['suppress_repeated_stacktrace'] = true + params['log_level'] = Fluent::Log::LEVEL_INFO + load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) } + + # first call + se_config = load_config_proc.call + assert_equal se_config[:log_level], Fluent::Log::LEVEL_INFO + assert_equal se_config[:suppress_repeated_stacktrace], true + assert_equal se_config[:worker_type], 'spawn' + assert_equal se_config[:workers], 1 + assert_equal se_config[:log_stdin], false + assert_equal se_config[:log_stdout], false + assert_equal se_config[:log_stderr], false + assert_equal se_config[:enable_heartbeat], true + assert_equal se_config[:auto_heartbeat], false + + # second call immediately(reuse config) + se_config = load_config_proc.call + pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] + pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] + assert_nil pre_config_mtime + assert_nil pre_loadtime + + sleep 5 + + # third call after 5 seconds(don't reuse config) + se_config = load_config_proc.call + pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime'] + pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime'] + assert_not_nil pre_config_mtime + assert_not_nil pre_loadtime + + # forth call immediately(reuse config) + se_config = load_config_proc.call + # test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config + assert_equal se_config[:windows_daemon_cmdline][5]['pre_config_mtime'], pre_config_mtime + assert_equal se_config[:windows_daemon_cmdline][5]['pre_loadtime'], pre_loadtime + + write_config tmp_dir, conf_debug_str + + # fifth call after changed conf file(don't reuse config) + se_config = load_config_proc.call + assert_equal se_config[:log_level], Fluent::Log::LEVEL_DEBUG + end + + def test_logger + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + log = sv.instance_variable_get(:@log) + log.init + logger = $log.instance_variable_get(:@logger) + + assert_equal $log.level, Fluent::Log::LEVEL_INFO + + # test that DamonLogger#level= overwrites Fluent.log#level + logger.level = 'debug' + assert_equal $log.level, Fluent::Log::LEVEL_DEBUG + end + + def create_debug_dummy_logger + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG + logdev = Fluent::Test::DummyLogDevice.new + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + $log = Fluent::Log.new(logger) + end + + def create_info_dummy_logger + dl_opts = {} + dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO + logdev = Fluent::Test::DummyLogDevice.new + logger = ServerEngine::DaemonLogger.new(logdev, dl_opts) + $log = Fluent::Log.new(logger) + end +end