diff --git a/bin/fluent-ctl b/bin/fluent-ctl new file mode 100755 index 0000000000..ffc93a7cbc --- /dev/null +++ b/bin/fluent-ctl @@ -0,0 +1,7 @@ +#!/usr/bin/env ruby + +here = File.dirname(__FILE__) +$LOAD_PATH << File.expand_path(File.join(here, '..', 'lib')) +require 'fluent/command/ctl' + +Fluent::Ctl.new.call diff --git a/lib/fluent/command/ctl.rb b/lib/fluent/command/ctl.rb new file mode 100644 index 0000000000..a042c989e2 --- /dev/null +++ b/lib/fluent/command/ctl.rb @@ -0,0 +1,177 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'optparse' +require 'fluent/env' +require 'fluent/version' +if Fluent.windows? + require 'win32/event' + require 'win32/service' +end + +module Fluent + class Ctl + DEFAULT_OPTIONS = {} + + if Fluent.windows? + include Windows::ServiceConstants + include Windows::ServiceStructs + include Windows::ServiceFunctions + + COMMAND_MAP = { + shutdown: "", + restart: "HUP", + flush: "USR1", + reload: "USR2", + } + WINSVC_CONTROL_CODE_MAP = { + shutdown: SERVICE_CONTROL_STOP, + # 128 - 255: user-defined control code + # See https://docs.microsoft.com/en-us/windows/win32/api/winsvc/nf-winsvc-controlservice + restart: 128, + flush: 129, + reload: SERVICE_CONTROL_PARAMCHANGE, + } + else + COMMAND_MAP = { + shutdown: :TERM, + restart: :HUP, + flush: :USR1, + reload: :USR2, + } + end + + def initialize(argv = ARGV) + @argv = argv + @options = {} + @opt_parser = OptionParser.new + configure_option_parser + @options.merge!(DEFAULT_OPTIONS) + parse_options! + end + + def help_text + text = "\n" + if Fluent.windows? + text << "Usage: #{$PROGRAM_NAME} COMMAND [PID_OR_SVCNAME]\n" + else + text << "Usage: #{$PROGRAM_NAME} COMMAND PID\n" + end + text << "\n" + text << "Commands: \n" + COMMAND_MAP.each do |key, value| + text << " #{key}\n" + end + text + end + + def usage(msg = nil) + puts help_text + if msg + puts + puts "Error: #{msg}" + end + exit 1 + end + + def call + if Fluent.windows? + if @pid_or_svcname =~ /^[0-9]+$/ + # Use as PID + return call_windows_event(@command, "fluentd_#{@pid_or_svcname}") + end + + unless call_winsvc_control_code(@command, @pid_or_svcname) + puts "Cannot send control code to #{@pid_or_svcname} service, try to send an event with this name ..." + call_windows_event(@command, @pid_or_svcname) + end + else + call_signal(@command, @pid_or_svcname) + end + end + + private + + def call_signal(command, pid) + signal = COMMAND_MAP[command.to_sym] + Process.kill(signal, pid.to_i) + end + + def call_winsvc_control_code(command, pid_or_svcname) + status = SERVICE_STATUS.new + + begin + handle_scm = OpenSCManager(nil, nil, SC_MANAGER_CONNECT) + FFI.raise_windows_error('OpenSCManager') if handle_scm == 0 + + handle_scs = OpenService(handle_scm, "fluentdwinsvc", SERVICE_STOP | SERVICE_PAUSE_CONTINUE | SERVICE_USER_DEFINED_CONTROL) + FFI.raise_windows_error('OpenService') if handle_scs == 0 + + control_code = WINSVC_CONTROL_CODE_MAP[command.to_sym] + + unless ControlService(handle_scs, control_code, status) + FFI.raise_windows_error('ControlService') + end + rescue => e + puts e + state = status[:dwCurrentState] + return state == SERVICE_STOPPED || state == SERVICE_STOP_PENDING + ensure + CloseServiceHandle(handle_scs) + CloseServiceHandle(handle_scm) + end + + return true + end + + def call_windows_event(command, pid_or_svcname) + prefix = pid_or_svcname + event_name = COMMAND_MAP[command.to_sym] + suffix = event_name.empty? ? "" : "_#{event_name}" + + begin + event = Win32::Event.open("#{prefix}#{suffix}") + event.set + event.close + rescue Errno::ENOENT => e + puts "Error: Cannot find the fluentd process with the event name: \"#{prefix}\"" + end + end + + def configure_option_parser + @opt_parser.banner = help_text + @opt_parser.version = Fluent::VERSION + end + + def parse_options! + @opt_parser.parse!(@argv) + + @command = @argv[0] + @pid_or_svcname = @argv[1] || "fluentdwinsvc" + + usage("Command isn't specified!") if @command.nil? || @command.empty? + usage("Unknown command: #{@command}") unless COMMAND_MAP.has_key?(@command.to_sym) + + if Fluent.windows? + usage("PID or SVCNAME isn't specified!") if @pid_or_svcname.nil? || @pid_or_svcname.empty? + else + usage("PID isn't specified!") if @pid_or_svcname.nil? || @pid_or_svcname.empty? + usage("Invalid PID: #{pid}") unless @pid_or_svcname =~ /^[0-9]+$/ + end + end + end +end + diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 33841b5d90..bd8b24af0b 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -53,11 +53,11 @@ def before_run @enable_get_dump = config[:enable_get_dump] run_rpc_server end - install_supervisor_signal_handlers - if config[:signame] - @signame = config[:signame] + if Fluent.windows? install_windows_event_handler + else + install_supervisor_signal_handlers end if counter = config[:counter_server] @@ -70,6 +70,7 @@ def before_run end def after_run + stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter Fluent::Supervisor.cleanup_resources @@ -92,7 +93,8 @@ def run_rpc_server @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request" if Fluent.windows? - $log.warn "operation 'flushBuffersAndKillWorkers' is not supported on Windows now." + supervisor_sigusr1_handler + stop(true) else Process.kill :USR1, $$ Process.kill :TERM, $$ @@ -101,7 +103,9 @@ def run_rpc_server } @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" - unless Fluent.windows? + if Fluent.windows? + supervisor_sigusr1_handler + else Process.kill :USR1, $$ end nil @@ -125,7 +129,9 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" - unless Fluent.windows? + if Fluent.windows? + supervisor_sigusr2_handler + else Process.kill :USR2, $$ end @@ -159,37 +165,101 @@ def stop_counter_server end def install_supervisor_signal_handlers + return if Fluent.windows? + trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" supervisor_sighup_handler - end unless Fluent.windows? + end trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler - end unless Fluent.windows? + end trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler - end unless Fluent.windows? + end + end + + if Fluent.windows? + # Override some methods of ServerEngine::MultiSpawnWorker + # Since Fluentd's Supervisor doesn't use ServerEngine's HUP, USR1 and USR2 + # handlers (see install_supervisor_signal_handlers), they should be + # disabled also on Windows, just send commands to workers instead. + def restart(graceful) + @monitors.each do |m| + m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n") + end + end + + def reload + @monitors.each do |m| + m.send_command("RELOAD\n") + end + end end def install_windows_event_handler + return unless Fluent.windows? + + @pid_signame = "fluentd_#{$$}" + @signame = config[:signame] + Thread.new do - ev = Win32::Event.new(@signame) + ipc = Win32::Ipc.new(nil) + events = [ + Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), + Win32::Event.new("#{@pid_signame}"), + Win32::Event.new("#{@pid_signame}_HUP"), + Win32::Event.new("#{@pid_signame}_USR1"), + Win32::Event.new("#{@pid_signame}_USR2"), + ] + if @signame + signame_events = [ + Win32::Event.new("#{@signame}"), + Win32::Event.new("#{@signame}_HUP"), + Win32::Event.new("#{@signame}_USR1"), + Win32::Event.new("#{@signame}_USR2"), + ] + events.concat(signame_events) + end begin - ev.reset - until WaitForSingleObject(ev.handle, 0) == WAIT_OBJECT_0 - sleep 1 + loop do + idx = ipc.wait_any(events, Windows::Synchronize::INFINITE) + if idx > 0 && idx <= events.length + $log.debug("Got Win32 event \"#{events[idx - 1].name}\"") + else + $log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}") + end + case idx + when 2, 6 + stop(true) + when 3, 7 + supervisor_sighup_handler + when 4, 8 + supervisor_sigusr1_handler + when 5, 9 + supervisor_sigusr2_handler + when 1 + break + end end - stop(true) ensure - ev.close + events.each { |event| event.close } end end end + def stop_windows_event_thread + if Fluent.windows? + ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD") + ev.set + ev.close + end + end + def supervisor_sighup_handler kill_worker end @@ -264,9 +334,25 @@ def reopen_log def send_signal_to_workers(signal) return unless config[:worker_pid] - config[:worker_pid].each_value do |pid| - # don't rescue Errno::ESRCH here (invalid status) - Process.kill(signal, pid) + if Fluent.windows? + send_command_to_workers(signal) + else + config[:worker_pid].each_value do |pid| + # don't rescue Errno::ESRCH here (invalid status) + Process.kill(signal, pid) + end + end + end + + def send_command_to_workers(signal) + # Use SeverEngine's CommandSender on Windows + case signal + when :HUP + restart(false) + when :USR1 + restart(true) + when :USR2 + reload end end end @@ -745,33 +831,45 @@ def install_main_process_signal_handlers end end - trap :USR1 do - flush_buffer - end unless Fluent.windows? + if Fluent.windows? + install_main_process_command_handlers + else + trap :USR1 do + flush_buffer + end - trap :USR2 do - reload_config - end unless Fluent.windows? + trap :USR2 do + reload_config + end + end + end - if Fluent.windows? - command_pipe = STDIN.dup - STDIN.reopen(File::NULL, "rb") - command_pipe.binmode - command_pipe.sync = true + def install_main_process_command_handlers + command_pipe = $stdin.dup + $stdin.reopen(File::NULL, "rb") + command_pipe.binmode + command_pipe.sync = true - Thread.new do - loop do - cmd = command_pipe.gets.chomp - case cmd - when "GRACEFUL_STOP", "IMMEDIATE_STOP" - $log.debug "fluentd main process get #{cmd} command" - @finished = true - $log.debug "getting start to shutdown main process" - Fluent::Engine.stop - break - else - $log.warn "fluentd main process get unknown command [#{cmd}]" - end + Thread.new do + loop do + cmd = command_pipe.gets + break unless cmd + + case cmd.chomp! + when "GRACEFUL_STOP", "IMMEDIATE_STOP" + $log.debug "fluentd main process get #{cmd} command" + @finished = true + $log.debug "getting start to shutdown main process" + Fluent::Engine.stop + break + when "GRACEFUL_RESTART" + $log.debug "fluentd main process get #{cmd} command" + flush_buffer + when "RELOAD" + $log.debug "fluentd main process get #{cmd} command" + reload_config + else + $log.warn "fluentd main process get unknown command [#{cmd}]" end end end diff --git a/lib/fluent/winsvc.rb b/lib/fluent/winsvc.rb index 23d76bab16..d1b893168d 100644 --- a/lib/fluent/winsvc.rb +++ b/lib/fluent/winsvc.rb @@ -61,7 +61,6 @@ def initialize(service_name) end def service_main - @pid = service_main_start(@service_name) while running? sleep 10 @@ -69,13 +68,32 @@ def service_main end def service_stop - ev = Win32::Event.open(@service_name) - ev.set - ev.close + set_event(@service_name) if @pid > 0 Process.waitpid(@pid) end end + + def service_paramchange + set_event("#{@service_name}_USR2") + end + + def service_user_defined_control(code) + case code + when 128 + set_event("#{@service_name}_HUP") + when 129 + set_event("#{@service_name}_USR1") + end + end + + private + + def set_event(event_name) + ev = Win32::Event.open(event_name) + ev.set + ev.close + end end FluentdService.new(opts[:service_name]).mainloop diff --git a/test/command/test_ctl.rb b/test/command/test_ctl.rb new file mode 100644 index 0000000000..4ab43a0d7a --- /dev/null +++ b/test/command/test_ctl.rb @@ -0,0 +1,57 @@ +require_relative '../helper' + +require 'test-unit' +require 'win32/event' if Fluent.windows? + +require 'fluent/command/ctl' + +class TestFluentdCtl < ::Test::Unit::TestCase + def assert_win32_event(event_name, command, pid_or_svcname) + command, event_suffix = data + event = Win32::Event.new(event_name) + ipc = Win32::Ipc.new(event.handle) + ret = Win32::Ipc::TIMEOUT + + wait_thread = Thread.new do + ret = ipc.wait(1) + end + Fluent::Ctl.new([command, pid_or_svcname]).call + wait_thread.join + assert_equal(Win32::Ipc::SIGNALED, ret) + end + + data("shutdown" => ["shutdown", "TERM", ""], + "restart" => ["restart", "HUP", "HUP"], + "flush" => ["flush", "USR1", "USR1"], + "reload" => ["reload", "USR2", "USR2"]) + def test_commands(data) + command, signal, event_suffix = data + + if Fluent.windows? + event_name = "fluentd_54321" + event_name << "_#{event_suffix}" unless event_suffix.empty? + assert_win32_event(event_name, command, "54321") + else + got_signal = false + Signal.trap(signal) do + got_signal = true + end + Fluent::Ctl.new([command, "#{$$}"]).call + assert_true(got_signal) + end + end + + data("shutdown" => ["shutdown", ""], + "restart" => ["restart", "HUP"], + "flush" => ["flush", "USR1"], + "reload" => ["reload", "USR2"]) + def test_commands_with_winsvcname(data) + omit "Only for Windows" unless Fluent.windows? + + command, event_suffix = data + event_name = "testfluentdwinsvc" + event_name << "_#{event_suffix}" unless event_suffix.empty? + + assert_win32_event(event_name, command, "testfluentdwinsvc") + end +end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 9e2296c8c2..be1ddaa3f3 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -111,6 +111,32 @@ def test_main_process_signal_handlers $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end + def test_main_process_command_handlers + omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? + + create_info_dummy_logger + + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + r, w = IO.pipe + $stdin = r + sv.send(:install_main_process_signal_handlers) + + begin + w.write("GRACEFUL_RESTART\n") + w.flush + ensure + $stdin = STDIN + end + + sleep 1 + + info_msg = '[info]: force flushing buffered events' + "\n" + assert{ $log.out.logs.first.end_with?(info_msg) } + ensure + $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) + end + def test_supervisor_signal_handler omit "Windows cannot handle signals" if Fluent.windows? @@ -137,21 +163,53 @@ def test_windows_shutdown_event server = DummyServer.new def server.config - {:signame => "TestFluentdEvent", :worker_pid => {0 => 1234}} + {:signame => "TestFluentdEvent"} end mock(server).stop(true) stub(Process).kill.times(0) - server.before_run server.install_windows_event_handler - sleep 0.1 # Wait for starting windows event thread - event = Win32::Event.open("TestFluentdEvent") - event.set - event.close - # Wait for stopping windows event thread. Should larger than 1 sec - # because the thread is awaked every 1 sec. - sleep 1.1 + begin + sleep 0.1 # Wait for starting windows event thread + event = Win32::Event.open("TestFluentdEvent") + event.set + event.close + ensure + server.stop_windows_event_thread + end + + debug_msg = '[debug]: Got Win32 event "TestFluentdEvent"' + logs = $log.out.logs + assert{ logs.any?{|log| log.include?(debug_msg) } } + ensure + $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) + end + + def test_supervisor_event_handler + omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? + + create_debug_dummy_logger + + server = DummyServer.new + def server.config + {:signame => "TestFluentdEvent"} + end + server.install_windows_event_handler + begin + sleep 0.1 # Wait for starting windows event thread + event = Win32::Event.open("TestFluentdEvent_USR1") + event.set + event.close + ensure + server.stop_windows_event_thread + end + + debug_msg = '[debug]: Got Win32 event "TestFluentdEvent_USR1"' + logs = $log.out.logs + assert{ logs.any?{|log| log.include?(debug_msg) } } + ensure + $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_rpc_server @@ -176,7 +234,7 @@ def test_rpc_server server.run_rpc_server sv.send(:install_main_process_signal_handlers) - Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers') + response = Net::HTTP.get(URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers')) info_msg = '[info]: force flushing buffered events' + "\n" server.stop_rpc_server @@ -185,11 +243,45 @@ def test_rpc_server # This test will be passed in such environment. pend unless $log.out.logs.first + assert_equal('{"ok":true}', response) assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset if $log.out.is_a?(Fluent::Test::DummyLogDevice) end + def test_rpc_server_windows + omit "Only for windows platform" unless Fluent.windows? + + create_info_dummy_logger + + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + conf_data = <<-EOC + + rpc_endpoint 0.0.0.0:24447 + + EOC + conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) + sys_conf = sv.__send__(:build_system_config, conf) + + server = DummyServer.new + def server.config + { + :signame => "TestFluentdEvent", + :worker_pid => 5963, + } + end + server.rpc_endpoint = sys_conf.rpc_endpoint + + server.run_rpc_server + + mock(server).restart(true) { nil } + response = Net::HTTP.get(URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers')) + + server.stop_rpc_server + assert_equal('{"ok":true}', response) + end + def test_load_config tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" conf_info_str = %[