From d0fd72a399a11f7c1412b734b8186b38ddc68483 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 8 Sep 2020 13:36:07 +0900 Subject: [PATCH 01/10] Simplify detecting whether to use unix signal or windows event Signed-off-by: Takuro Ashie --- lib/fluent/supervisor.rb | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 33841b5d90..4c9503da44 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -53,11 +53,11 @@ def before_run @enable_get_dump = config[:enable_get_dump] run_rpc_server end - install_supervisor_signal_handlers - if config[:signame] - @signame = config[:signame] + if Fluent.windows? install_windows_event_handler + else + install_supervisor_signal_handlers end if counter = config[:counter_server] @@ -159,23 +159,30 @@ def stop_counter_server end def install_supervisor_signal_handlers + return if Fluent.windows? + trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" supervisor_sighup_handler - end unless Fluent.windows? + end trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler - end unless Fluent.windows? + end trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler - end unless Fluent.windows? + end end def install_windows_event_handler + return unless Fluent.windows? + return unless config[:signame] + + @signame = config[:signame] + Thread.new do ev = Win32::Event.new(@signame) begin From 10c69acc36df37d394bde1d6d17b9384cb57deae Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 13 Nov 2020 17:55:06 +0900 Subject: [PATCH 02/10] Use win32 event as alternative to UNIX signal Signed-off-by: Takuro Ashie --- lib/fluent/supervisor.rb | 143 ++++++++++++++++++++++++++++++--------- test/test_supervisor.rb | 76 ++++++++++++++++++--- 2 files changed, 177 insertions(+), 42 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 4c9503da44..3eca354bae 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -70,6 +70,7 @@ def before_run end def after_run + stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter Fluent::Supervisor.cleanup_resources @@ -177,6 +178,24 @@ def install_supervisor_signal_handlers end end + if Fluent.windows? + # Override some methods of ServerEngine::MultiSpawnWorker + # Since Fluentd's Supervisor doesn't use ServerEngine's HUP, USR1 and USR2 + # handlers (see install_supervisor_signal_handlers), they should be + # disabled also on Windows, just send commands to workers instead. + def restart(graceful) + @monitors.each do |m| + m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n") + end + end + + def reload + @monitors.each do |m| + m.send_command("RELOAD\n") + end + end + end + def install_windows_event_handler return unless Fluent.windows? return unless config[:signame] @@ -184,19 +203,49 @@ def install_windows_event_handler @signame = config[:signame] Thread.new do - ev = Win32::Event.new(@signame) + ipc = Win32::Ipc.new(nil) + events = [ + Win32::Event.new("#{@signame}"), + Win32::Event.new("#{@signame}_HUP"), + Win32::Event.new("#{@signame}_USR1"), + Win32::Event.new("#{@signame}_USR2"), + Win32::Event.new("#{@signame}_STOP_EVENT_THREAD"), + ] begin - ev.reset - until WaitForSingleObject(ev.handle, 0) == WAIT_OBJECT_0 - sleep 1 + loop do + idx = ipc.wait_any(events, Windows::Synchronize::INFINITE) + if idx > 0 && idx <= events.length + $log.debug("Got Win32 event \"#{events[idx - 1].name}\"") + else + $log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}") + end + case idx + when 1 + stop(true) + when 2 + supervisor_sighup_handler + when 3 + supervisor_sigusr1_handler + when 4 + supervisor_sigusr2_handler + when 5 + break + end end - stop(true) ensure - ev.close + events.each { |event| event.close } end end end + def stop_windows_event_thread + if Fluent.windows? && @signame + ev = Win32::Event.open("#{@signame}_STOP_EVENT_THREAD") + ev.set + ev.close + end + end + def supervisor_sighup_handler kill_worker end @@ -271,9 +320,25 @@ def reopen_log def send_signal_to_workers(signal) return unless config[:worker_pid] - config[:worker_pid].each_value do |pid| - # don't rescue Errno::ESRCH here (invalid status) - Process.kill(signal, pid) + if Fluent.windows? + send_command_to_workers(signal) + else + config[:worker_pid].each_value do |pid| + # don't rescue Errno::ESRCH here (invalid status) + Process.kill(signal, pid) + end + end + end + + def send_command_to_workers(signal) + # Use SeverEngine's CommandSender on Windows + case signal + when :HUP + restart(false) + when :USR1 + restart(true) + when :USR2 + reload end end end @@ -752,33 +817,45 @@ def install_main_process_signal_handlers end end - trap :USR1 do - flush_buffer - end unless Fluent.windows? + if Fluent.windows? + install_main_process_command_handlers + else + trap :USR1 do + flush_buffer + end - trap :USR2 do - reload_config - end unless Fluent.windows? + trap :USR2 do + reload_config + end + end + end - if Fluent.windows? - command_pipe = STDIN.dup - STDIN.reopen(File::NULL, "rb") - command_pipe.binmode - command_pipe.sync = true + def install_main_process_command_handlers + command_pipe = $stdin.dup + $stdin.reopen(File::NULL, "rb") + command_pipe.binmode + command_pipe.sync = true - Thread.new do - loop do - cmd = command_pipe.gets.chomp - case cmd - when "GRACEFUL_STOP", "IMMEDIATE_STOP" - $log.debug "fluentd main process get #{cmd} command" - @finished = true - $log.debug "getting start to shutdown main process" - Fluent::Engine.stop - break - else - $log.warn "fluentd main process get unknown command [#{cmd}]" - end + Thread.new do + loop do + cmd = command_pipe.gets + break unless cmd + + case cmd.chomp! + when "GRACEFUL_STOP", "IMMEDIATE_STOP" + $log.debug "fluentd main process get #{cmd} command" + @finished = true + $log.debug "getting start to shutdown main process" + Fluent::Engine.stop + break + when "GRACEFUL_RESTART" + $log.debug "fluentd main process get #{cmd} command" + flush_buffer + when "RELOAD" + $log.debug "fluentd main process get #{cmd} command" + reload_config + else + $log.warn "fluentd main process get unknown command [#{cmd}]" end end end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 9e2296c8c2..9e3e039308 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -111,6 +111,32 @@ def test_main_process_signal_handlers $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end + def test_main_process_command_handlers + omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? + + create_info_dummy_logger + + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + r, w = IO.pipe + $stdin = r + sv.send(:install_main_process_signal_handlers) + + begin + w.write("GRACEFUL_RESTART\n") + w.flush + ensure + $stdin = STDIN + end + + sleep 1 + + info_msg = '[info]: force flushing buffered events' + "\n" + assert{ $log.out.logs.first.end_with?(info_msg) } + ensure + $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) + end + def test_supervisor_signal_handler omit "Windows cannot handle signals" if Fluent.windows? @@ -137,21 +163,53 @@ def test_windows_shutdown_event server = DummyServer.new def server.config - {:signame => "TestFluentdEvent", :worker_pid => {0 => 1234}} + {:signame => "TestFluentdEvent"} end mock(server).stop(true) stub(Process).kill.times(0) - server.before_run server.install_windows_event_handler - sleep 0.1 # Wait for starting windows event thread - event = Win32::Event.open("TestFluentdEvent") - event.set - event.close - # Wait for stopping windows event thread. Should larger than 1 sec - # because the thread is awaked every 1 sec. - sleep 1.1 + begin + sleep 0.1 # Wait for starting windows event thread + event = Win32::Event.open("TestFluentdEvent") + event.set + event.close + ensure + server.stop_windows_event_thread + end + + debug_msg = '[debug]: Got Win32 event "TestFluentdEvent"' + logs = $log.out.logs + assert{ logs.any?{|log| log.include?(debug_msg) } } + ensure + $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) + end + + def test_supervisor_event_handler + omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows? + + create_debug_dummy_logger + + server = DummyServer.new + def server.config + {:signame => "TestFluentdEvent"} + end + server.install_windows_event_handler + begin + sleep 0.1 # Wait for starting windows event thread + event = Win32::Event.open("TestFluentdEvent_USR1") + event.set + event.close + ensure + server.stop_windows_event_thread + end + + debug_msg = '[debug]: Got Win32 event "TestFluentdEvent_USR1"' + logs = $log.out.logs + assert{ logs.any?{|log| log.include?(debug_msg) } } + ensure + $log.out.reset if $log && $log.out && $log.out.respond_to?(:reset) end def test_rpc_server From b58f3c2b02acd1ce3f3ef264dc2b6b746ef58d4a Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 17 Sep 2020 11:51:34 +0900 Subject: [PATCH 03/10] Implement missing RPC methods on Windows In addition, fix an invalid destination IP address "0.0.0.0" in a related test. Signed-off-by: Takuro Ashie --- lib/fluent/supervisor.rb | 11 ++++++++--- test/test_supervisor.rb | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 3eca354bae..4228e40451 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -93,7 +93,8 @@ def run_rpc_server @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request" if Fluent.windows? - $log.warn "operation 'flushBuffersAndKillWorkers' is not supported on Windows now." + supervisor_sigusr1_handler + stop(true) else Process.kill :USR1, $$ Process.kill :TERM, $$ @@ -102,7 +103,9 @@ def run_rpc_server } @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" - unless Fluent.windows? + if Fluent.windows? + supervisor_sigusr1_handler + else Process.kill :USR1, $$ end nil @@ -126,7 +129,9 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" - unless Fluent.windows? + if Fluent.windows? + supervisor_sigusr2_handler + else Process.kill :USR2, $$ end diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 9e3e039308..e07e0cc460 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -234,7 +234,7 @@ def test_rpc_server server.run_rpc_server sv.send(:install_main_process_signal_handlers) - Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers') + Net::HTTP.get URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers') info_msg = '[info]: force flushing buffered events' + "\n" server.stop_rpc_server @@ -248,6 +248,38 @@ def test_rpc_server $log.out.reset if $log.out.is_a?(Fluent::Test::DummyLogDevice) end + def test_rpc_server_windows + omit "Only for windows platform" unless Fluent.windows? + + create_info_dummy_logger + + opts = Fluent::Supervisor.default_options + sv = Fluent::Supervisor.new(opts) + conf_data = <<-EOC + + rpc_endpoint 0.0.0.0:24447 + + EOC + conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true) + sys_conf = sv.__send__(:build_system_config, conf) + + server = DummyServer.new + def server.config + { + :signame => "TestFluentdEvent", + :worker_pid => 5963, + } + end + server.rpc_endpoint = sys_conf.rpc_endpoint + + server.run_rpc_server + + mock(server).restart(true) { nil } + Net::HTTP.get URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers') + + server.stop_rpc_server + end + def test_load_config tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" conf_info_str = %[ From c408ef7b4057ea8da124232f06fcee334fbd1bcc Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 9 Oct 2020 15:29:09 +0900 Subject: [PATCH 04/10] Add PID base events for Windows Signed-off-by: Takuro Ashie --- lib/fluent/supervisor.rb | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 4228e40451..bd8b24af0b 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -203,19 +203,28 @@ def reload def install_windows_event_handler return unless Fluent.windows? - return unless config[:signame] + @pid_signame = "fluentd_#{$$}" @signame = config[:signame] Thread.new do ipc = Win32::Ipc.new(nil) events = [ - Win32::Event.new("#{@signame}"), - Win32::Event.new("#{@signame}_HUP"), - Win32::Event.new("#{@signame}_USR1"), - Win32::Event.new("#{@signame}_USR2"), - Win32::Event.new("#{@signame}_STOP_EVENT_THREAD"), + Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), + Win32::Event.new("#{@pid_signame}"), + Win32::Event.new("#{@pid_signame}_HUP"), + Win32::Event.new("#{@pid_signame}_USR1"), + Win32::Event.new("#{@pid_signame}_USR2"), ] + if @signame + signame_events = [ + Win32::Event.new("#{@signame}"), + Win32::Event.new("#{@signame}_HUP"), + Win32::Event.new("#{@signame}_USR1"), + Win32::Event.new("#{@signame}_USR2"), + ] + events.concat(signame_events) + end begin loop do idx = ipc.wait_any(events, Windows::Synchronize::INFINITE) @@ -225,15 +234,15 @@ def install_windows_event_handler $log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}") end case idx - when 1 + when 2, 6 stop(true) - when 2 + when 3, 7 supervisor_sighup_handler - when 3 + when 4, 8 supervisor_sigusr1_handler - when 4 + when 5, 9 supervisor_sigusr2_handler - when 5 + when 1 break end end @@ -244,8 +253,8 @@ def install_windows_event_handler end def stop_windows_event_thread - if Fluent.windows? && @signame - ev = Win32::Event.open("#{@signame}_STOP_EVENT_THREAD") + if Fluent.windows? + ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD") ev.set ev.close end From 8eb60faaf5580412fb9418482cf1961b85de7328 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 8 Oct 2020 16:04:13 +0900 Subject: [PATCH 05/10] Add fluent-ctl command It sends a simple command to a fluentd process. The main purpose of this command is that providing a kill command alternative on Windows. On UNIX, it just only send a UNIX signal to a specified process. Usage: fluent-ctl COMMAND PID_OR_SIGNAME Commands: shutdown restart flush reload SIGNAME can be specified by fluentd's "-x" or "--signame" option. Note that this command doesn't support sending events to windows service. Signed-off-by: Takuro Ashie --- bin/fluent-ctl | 7 ++ lib/fluent/command/ctl.rb | 132 ++++++++++++++++++++++++++++++++++++++ test/command/test_ctl.rb | 57 ++++++++++++++++ 3 files changed, 196 insertions(+) create mode 100755 bin/fluent-ctl create mode 100644 lib/fluent/command/ctl.rb create mode 100644 test/command/test_ctl.rb diff --git a/bin/fluent-ctl b/bin/fluent-ctl new file mode 100755 index 0000000000..ffc93a7cbc --- /dev/null +++ b/bin/fluent-ctl @@ -0,0 +1,7 @@ +#!/usr/bin/env ruby + +here = File.dirname(__FILE__) +$LOAD_PATH << File.expand_path(File.join(here, '..', 'lib')) +require 'fluent/command/ctl' + +Fluent::Ctl.new.call diff --git a/lib/fluent/command/ctl.rb b/lib/fluent/command/ctl.rb new file mode 100644 index 0000000000..625cae7c30 --- /dev/null +++ b/lib/fluent/command/ctl.rb @@ -0,0 +1,132 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'optparse' +require 'fluent/env' +require 'fluent/version' +require 'win32/event' if Fluent.windows? + +module Fluent + class Ctl + DEFAULT_OPTIONS = {} + UNIX_SIGNAL_MAP = { + shutdown: :TERM, + restart: :HUP, + flush: :USR1, + reload: :USR2, + } + WINDOWS_EVENT_MAP = { + shutdown: "", + restart: "HUP", + flush: "USR1", + reload: "USR2", + } + if Fluent.windows? + COMMAND_MAP = WINDOWS_EVENT_MAP + else + COMMAND_MAP = UNIX_SIGNAL_MAP + end + + def initialize(argv = ARGV) + @argv = argv + @options = {} + @opt_parser = OptionParser.new + configure_option_parser + @options.merge!(DEFAULT_OPTIONS) + parse_options! + end + + def help_text + text = "\n" + if Fluent.windows? + text << "Usage: #{$PROGRAM_NAME} COMMAND PID_OR_SIGNAME\n" + else + text << "Usage: #{$PROGRAM_NAME} COMMAND PID\n" + end + text << "\n" + text << "Commands: \n" + COMMAND_MAP.each do |key, value| + text << " #{key}\n" + end + text + end + + def usage(msg = nil) + puts help_text + if msg + puts + puts "Error: #{msg}" + end + exit 1 + end + + def call + if Fluent.windows? + call_windows_event(@command, @pid_or_signame) + else + call_signal(@command, @pid_or_signame) + end + end + + private + + def call_signal(command, pid) + signal = COMMAND_MAP[command.to_sym] + Process.kill(signal, pid.to_i) + end + + def call_windows_event(command, pid_or_signame) + if pid_or_signame =~ /^[0-9]+$/ + prefix = "fluentd_#{pid_or_signame}" + else + prefix = pid_or_signame + end + event_name = COMMAND_MAP[command.to_sym] + suffix = event_name.empty? ? "" : "_#{event_name}" + + begin + event = Win32::Event.open("#{prefix}#{suffix}") + event.set + event.close + rescue Errno::ENOENT => e + puts "Error: Cannot find the fluentd process with the event name: \"#{prefix}\"" + end + end + + def configure_option_parser + @opt_parser.banner = help_text + @opt_parser.version = Fluent::VERSION + end + + def parse_options! + @opt_parser.parse!(@argv) + + @command = @argv[0] + @pid_or_signame = @argv[1] + + usage("Command isn't specified!") if @command.nil? || @command.empty? + usage("Unknown command: #{@command}") unless COMMAND_MAP.has_key?(@command.to_sym) + + if Fluent.windows? + usage("PID or SIGNAME isn't specified!") if @pid_or_signame.nil? || @pid_or_signame.empty? + else + usage("PID isn't specified!") if @pid_or_signame.nil? || @pid_or_signame.empty? + usage("Invalid PID: #{pid}") unless @pid_or_signame =~ /^[0-9]+$/ + end + end + end +end + diff --git a/test/command/test_ctl.rb b/test/command/test_ctl.rb new file mode 100644 index 0000000000..4ab43a0d7a --- /dev/null +++ b/test/command/test_ctl.rb @@ -0,0 +1,57 @@ +require_relative '../helper' + +require 'test-unit' +require 'win32/event' if Fluent.windows? + +require 'fluent/command/ctl' + +class TestFluentdCtl < ::Test::Unit::TestCase + def assert_win32_event(event_name, command, pid_or_svcname) + command, event_suffix = data + event = Win32::Event.new(event_name) + ipc = Win32::Ipc.new(event.handle) + ret = Win32::Ipc::TIMEOUT + + wait_thread = Thread.new do + ret = ipc.wait(1) + end + Fluent::Ctl.new([command, pid_or_svcname]).call + wait_thread.join + assert_equal(Win32::Ipc::SIGNALED, ret) + end + + data("shutdown" => ["shutdown", "TERM", ""], + "restart" => ["restart", "HUP", "HUP"], + "flush" => ["flush", "USR1", "USR1"], + "reload" => ["reload", "USR2", "USR2"]) + def test_commands(data) + command, signal, event_suffix = data + + if Fluent.windows? + event_name = "fluentd_54321" + event_name << "_#{event_suffix}" unless event_suffix.empty? + assert_win32_event(event_name, command, "54321") + else + got_signal = false + Signal.trap(signal) do + got_signal = true + end + Fluent::Ctl.new([command, "#{$$}"]).call + assert_true(got_signal) + end + end + + data("shutdown" => ["shutdown", ""], + "restart" => ["restart", "HUP"], + "flush" => ["flush", "USR1"], + "reload" => ["reload", "USR2"]) + def test_commands_with_winsvcname(data) + omit "Only for Windows" unless Fluent.windows? + + command, event_suffix = data + event_name = "testfluentdwinsvc" + event_name << "_#{event_suffix}" unless event_suffix.empty? + + assert_win32_event(event_name, command, "testfluentdwinsvc") + end +end From 7dee737ee6fb36a9357911745d1081d72a7d6bfb Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 19 Nov 2020 18:30:19 +0900 Subject: [PATCH 06/10] Add grace-reload trigger for windows service You can trigger it by the following command: sc control fluentdwinsvc paramchange Signed-off-by: Takuro Ashie --- lib/fluent/winsvc.rb | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/fluent/winsvc.rb b/lib/fluent/winsvc.rb index 23d76bab16..9c7b3605d3 100644 --- a/lib/fluent/winsvc.rb +++ b/lib/fluent/winsvc.rb @@ -61,7 +61,6 @@ def initialize(service_name) end def service_main - @pid = service_main_start(@service_name) while running? sleep 10 @@ -69,13 +68,23 @@ def service_main end def service_stop - ev = Win32::Event.open(@service_name) - ev.set - ev.close + set_event(@service_name) if @pid > 0 Process.waitpid(@pid) end end + + def service_paramchange + set_event("#{@service_name}_USR2") + end + + private + + def set_event(event_name) + ev = Win32::Event.open(event_name) + ev.set + ev.close + end end FluentdService.new(opts[:service_name]).mainloop From d676283c7dc4c38b191e5f4ea15ba9ee3c3e891f Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 20 Nov 2020 14:09:58 +0900 Subject: [PATCH 07/10] fluent-ctl: Support to send control codes to windows service In addition enable to omit 2nd argument on windows (use "fluentdwinsvc" by default). Usage: fluent-ctl COMMAND [PID_OR_SVCNAME] Commands: shutdown restart flush reload Signed-off-by: Takuro Ashie --- lib/fluent/command/ctl.rb | 97 +++++++++++++++++++++++++++++---------- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/lib/fluent/command/ctl.rb b/lib/fluent/command/ctl.rb index 625cae7c30..470ab57b57 100644 --- a/lib/fluent/command/ctl.rb +++ b/lib/fluent/command/ctl.rb @@ -17,26 +17,42 @@ require 'optparse' require 'fluent/env' require 'fluent/version' -require 'win32/event' if Fluent.windows? +if Fluent.windows? + require 'win32/event' + require 'win32/service' +end module Fluent class Ctl DEFAULT_OPTIONS = {} - UNIX_SIGNAL_MAP = { - shutdown: :TERM, - restart: :HUP, - flush: :USR1, - reload: :USR2, - } - WINDOWS_EVENT_MAP = { - shutdown: "", - restart: "HUP", - flush: "USR1", - reload: "USR2", - } + if Fluent.windows? + include Windows::ServiceConstants + include Windows::ServiceStructs + include Windows::ServiceFunctions + + WINDOWS_EVENT_MAP = { + shutdown: "", + restart: "HUP", + flush: "USR1", + reload: "USR2", + } + WINSVC_CONTROL_CODE_MAP = { + shutdown: SERVICE_CONTROL_STOP, + # 128 - 255: user-defined control code + # See https://docs.microsoft.com/en-us/windows/win32/api/winsvc/nf-winsvc-controlservice + restart: 128, + flush: 129, + reload: SERVICE_CONTROL_PARAMCHANGE, + } COMMAND_MAP = WINDOWS_EVENT_MAP else + UNIX_SIGNAL_MAP = { + shutdown: :TERM, + restart: :HUP, + flush: :USR1, + reload: :USR2, + } COMMAND_MAP = UNIX_SIGNAL_MAP end @@ -52,7 +68,7 @@ def initialize(argv = ARGV) def help_text text = "\n" if Fluent.windows? - text << "Usage: #{$PROGRAM_NAME} COMMAND PID_OR_SIGNAME\n" + text << "Usage: #{$PROGRAM_NAME} COMMAND [PID_OR_SVCNAME]\n" else text << "Usage: #{$PROGRAM_NAME} COMMAND PID\n" end @@ -75,9 +91,17 @@ def usage(msg = nil) def call if Fluent.windows? - call_windows_event(@command, @pid_or_signame) + if @pid_or_svcname =~ /^[0-9]+$/ + # Use as PID + return call_windows_event(@command, "fluentd_#{@pid_or_svcname}") + end + + unless call_winsvc_control_code(@command, @pid_or_svcname) + puts "Cannot send control code to #{@pid_or_svcname} service, try to send an event with this name ..." + call_windows_event(@command, @pid_or_svcname) + end else - call_signal(@command, @pid_or_signame) + call_signal(@command, @pid_or_svcname) end end @@ -88,12 +112,35 @@ def call_signal(command, pid) Process.kill(signal, pid.to_i) end - def call_windows_event(command, pid_or_signame) - if pid_or_signame =~ /^[0-9]+$/ - prefix = "fluentd_#{pid_or_signame}" - else - prefix = pid_or_signame + def call_winsvc_control_code(command, pid_or_svcname) + status = SERVICE_STATUS.new + + begin + handle_scm = OpenSCManager(nil, nil, SC_MANAGER_CONNECT) + FFI.raise_windows_error('OpenSCManager') if handle_scm == 0 + + handle_scs = OpenService(handle_scm, "fluentdwinsvc", SERVICE_STOP | SERVICE_PAUSE_CONTINUE | SERVICE_USER_DEFINED_CONTROL) + FFI.raise_windows_error('OpenService') if handle_scs == 0 + + control_code = WINSVC_CONTROL_CODE_MAP[command.to_sym] + + unless ControlService(handle_scs, control_code, status) + FFI.raise_windows_error('ControlService') + end + rescue => e + puts e + state = status[:dwCurrentState] + return state == SERVICE_STOPPED || state == SERVICE_STOP_PENDING + ensure + CloseServiceHandle(handle_scs) + CloseServiceHandle(handle_scm) end + + return true + end + + def call_windows_event(command, pid_or_svcname) + prefix = pid_or_svcname event_name = COMMAND_MAP[command.to_sym] suffix = event_name.empty? ? "" : "_#{event_name}" @@ -115,16 +162,16 @@ def parse_options! @opt_parser.parse!(@argv) @command = @argv[0] - @pid_or_signame = @argv[1] + @pid_or_svcname = @argv[1] || "fluentdwinsvc" usage("Command isn't specified!") if @command.nil? || @command.empty? usage("Unknown command: #{@command}") unless COMMAND_MAP.has_key?(@command.to_sym) if Fluent.windows? - usage("PID or SIGNAME isn't specified!") if @pid_or_signame.nil? || @pid_or_signame.empty? + usage("PID or SVCNAME isn't specified!") if @pid_or_svcname.nil? || @pid_or_svcname.empty? else - usage("PID isn't specified!") if @pid_or_signame.nil? || @pid_or_signame.empty? - usage("Invalid PID: #{pid}") unless @pid_or_signame =~ /^[0-9]+$/ + usage("PID isn't specified!") if @pid_or_svcname.nil? || @pid_or_svcname.empty? + usage("Invalid PID: #{pid}") unless @pid_or_svcname =~ /^[0-9]+$/ end end end From 284ca1bba0fbdf26bd6fd4e862a40c471e496fab Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Fri, 20 Nov 2020 14:30:21 +0900 Subject: [PATCH 08/10] winsvc: Add experimental code to support restart & flush trigger Note that you need to modify win32-event gem to take effect this code. Signed-off-by: Takuro Ashie --- lib/fluent/winsvc.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/fluent/winsvc.rb b/lib/fluent/winsvc.rb index 9c7b3605d3..d1b893168d 100644 --- a/lib/fluent/winsvc.rb +++ b/lib/fluent/winsvc.rb @@ -78,6 +78,15 @@ def service_paramchange set_event("#{@service_name}_USR2") end + def service_user_defined_control(code) + case code + when 128 + set_event("#{@service_name}_HUP") + when 129 + set_event("#{@service_name}_USR1") + end + end + private def set_event(event_name) From b6ab3fad15385ffa9366aed304cb7c27ecd2160b Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 8 Dec 2020 08:23:59 +0900 Subject: [PATCH 09/10] fluent-ctl: Remove needless constants Signed-off-by: Takuro Ashie --- lib/fluent/command/ctl.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/fluent/command/ctl.rb b/lib/fluent/command/ctl.rb index 470ab57b57..a042c989e2 100644 --- a/lib/fluent/command/ctl.rb +++ b/lib/fluent/command/ctl.rb @@ -31,7 +31,7 @@ class Ctl include Windows::ServiceStructs include Windows::ServiceFunctions - WINDOWS_EVENT_MAP = { + COMMAND_MAP = { shutdown: "", restart: "HUP", flush: "USR1", @@ -45,15 +45,13 @@ class Ctl flush: 129, reload: SERVICE_CONTROL_PARAMCHANGE, } - COMMAND_MAP = WINDOWS_EVENT_MAP else - UNIX_SIGNAL_MAP = { + COMMAND_MAP = { shutdown: :TERM, restart: :HUP, flush: :USR1, reload: :USR2, } - COMMAND_MAP = UNIX_SIGNAL_MAP end def initialize(argv = ARGV) From f4a368ac3189c0ef9fbcca2193b1342692a1b7c3 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Mon, 14 Dec 2020 12:30:50 +0900 Subject: [PATCH 10/10] test_supervisor: Check response of RPC server Signed-off-by: Takuro Ashie --- test/test_supervisor.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index e07e0cc460..be1ddaa3f3 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -234,7 +234,7 @@ def test_rpc_server server.run_rpc_server sv.send(:install_main_process_signal_handlers) - Net::HTTP.get URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers') + response = Net::HTTP.get(URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers')) info_msg = '[info]: force flushing buffered events' + "\n" server.stop_rpc_server @@ -243,6 +243,7 @@ def test_rpc_server # This test will be passed in such environment. pend unless $log.out.logs.first + assert_equal('{"ok":true}', response) assert{ $log.out.logs.first.end_with?(info_msg) } ensure $log.out.reset if $log.out.is_a?(Fluent::Test::DummyLogDevice) @@ -275,9 +276,10 @@ def server.config server.run_rpc_server mock(server).restart(true) { nil } - Net::HTTP.get URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers') + response = Net::HTTP.get(URI.parse('http://127.0.0.1:24447/api/plugins.flushBuffers')) server.stop_rpc_server + assert_equal('{"ok":true}', response) end def test_load_config