Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sigdump func to fluent-ctl #3680

Merged
merged 15 commits into from
May 20, 2022
3 changes: 3 additions & 0 deletions lib/fluent/command/ctl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Ctl
restart: "HUP",
flush: "USR1",
reload: "USR2",
dump: "CONT",
}
WINSVC_CONTROL_CODE_MAP = {
shutdown: SERVICE_CONTROL_STOP,
Expand All @@ -44,13 +45,15 @@ class Ctl
restart: 128,
flush: 129,
reload: SERVICE_CONTROL_PARAMCHANGE,
dump: 130,
}
else
COMMAND_MAP = {
shutdown: :TERM,
restart: :HUP,
flush: :USR1,
reload: :USR2,
dump: :CONT,
}
end

Expand Down
121 changes: 101 additions & 20 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'fileutils'
require 'open3'
require 'pathname'

require 'fluent/config'
require 'fluent/counter'
Expand Down Expand Up @@ -106,6 +107,16 @@ def run_rpc_server
end
nil
}
@rpc_server.mount_proc('/api/processes.dump') { |req, res|
$log.debug "fluentd RPC got /api/processes.dump request"
if Fluent.windows?
supervisor_dump_handler_for_windows
else
Process.kill :CONT, $$
send_signal_to_workers(:CONT)
end
nil
}
ashie marked this conversation as resolved.
Show resolved Hide resolved
@rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
$log.debug "fluentd RPC got /api/plugins.flushBuffers request"
if Fluent.windows?
Expand Down Expand Up @@ -215,44 +226,50 @@ def install_windows_event_handler
Thread.new do
ipc = Win32::Ipc.new(nil)
events = [
Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"),
Win32::Event.new("#{@pid_signame}"),
Win32::Event.new("#{@pid_signame}_HUP"),
Win32::Event.new("#{@pid_signame}_USR1"),
Win32::Event.new("#{@pid_signame}_USR2"),
{win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
{win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
{win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
{win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
{win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
{win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
]
if @signame
signame_events = [
Win32::Event.new("#{@signame}"),
Win32::Event.new("#{@signame}_HUP"),
Win32::Event.new("#{@signame}_USR1"),
Win32::Event.new("#{@signame}_USR2"),
{win32_event: Win32::Event.new("#{@signame}"), action: :stop},
{win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
{win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
{win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
{win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
]
events.concat(signame_events)
end
begin
loop do
idx = ipc.wait_any(events, Windows::Synchronize::INFINITE)
if idx > 0 && idx <= events.length
$log.debug("Got Win32 event \"#{events[idx - 1].name}\"")
ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, Windows::Synchronize::INFINITE)
event_idx = ipc_idx - 1

if event_idx >= 0 && event_idx < events.length
$log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
else
$log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}")
$log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
end
case idx
when 2, 6
case events[event_idx][:action]
when :stop
stop(true)
when 3, 7
when :hup
supervisor_sighup_handler
when 4, 8
when :usr1
supervisor_sigusr1_handler
when 5, 9
when :usr2
supervisor_sigusr2_handler
when 1
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
break
end
end
ensure
events.each { |event| event.close }
events.each { |event| event[:win32_event].close }
end
end
end
Expand Down Expand Up @@ -302,6 +319,26 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def supervisor_dump_handler_for_windows
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
# and it is implemented before the implementation of the function for Windows.
# It is possible to trap SIGCONT and handle it here also on UNIX-like,
# but for backward compatibility, this handler is currently for a Windows-only.
raise "[BUG] This function is for Windows ONLY." unless Fluent.windows?

Thread.new do
begin
FluentSigdump.dump_windows
rescue => e
$log.error "failed to dump: #{e}"
end
end

send_signal_to_workers(:CONT)
rescue => e
$log.error "failed to dump: #{e}"
end

def kill_worker
if config[:worker_pid]
pids = config[:worker_pid].clone
Expand Down Expand Up @@ -358,6 +395,14 @@ def send_command_to_workers(signal)
restart(true)
when :USR2
reload
when :CONT
dump_all_windows_workers
end
end

def dump_all_windows_workers
@monitors.each do |m|
m.send_command("DUMP\n")
end
end
end
Expand Down Expand Up @@ -896,6 +941,9 @@ def install_main_process_command_handlers
when "RELOAD"
$log.debug "fluentd main process get #{cmd} command"
reload_config
when "DUMP"
$log.debug "fluentd main process get #{cmd} command"
dump
else
$log.warn "fluentd main process get unknown command [#{cmd}]"
end
Expand Down Expand Up @@ -945,6 +993,16 @@ def reload_config
end
end

def dump
Thread.new do
begin
FluentSigdump.dump_windows
rescue => e
$log.error("failed to dump: #{e}")
end
end
end

def logging_with_console_output
yield $log
unless @log.stdout?
Expand Down Expand Up @@ -1054,4 +1112,27 @@ def build_spawn_command
fluentd_spawn_cmd
end
end

module FluentSigdump
def self.dump_windows
raise "[BUG] WindowsSigdump::dump is for Windows ONLY." unless Fluent.windows?

# Sigdump outputs under `/tmp` dir without `SIGDUMP_PATH` specified,
# but `/tmp` dir may not exist on Windows by default.
# So use the systemroot-temp-dir instead.
dump_filepath = ENV['SIGDUMP_PATH'].nil? || ENV['SIGDUMP_PATH'].empty? \
? "#{ENV['windir']}/Temp/fluentd-sigdump-#{Process.pid}.log"
: get_path_with_pid(ENV['SIGDUMP_PATH'])

require 'sigdump'
Sigdump.dump(dump_filepath)

$log.info "dump to #{dump_filepath}."
end

def self.get_path_with_pid(raw_path)
path = Pathname.new(raw_path)
path.sub_ext("-#{Process.pid}#{path.extname}").to_s
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/winsvc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def service_user_defined_control(code)
set_event("#{@service_name}_HUP")
when 129
set_event("#{@service_name}_USR1")
when 130
set_event("#{@service_name}_CONT")
end
end

Expand Down
12 changes: 9 additions & 3 deletions test/plugin/test_in_object_space.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ def waiting(seconds, instance)
end

class FailObject
def self.class
raise "error"
end
end

def setup
Fluent::Test.setup
# Overriding this behavior in the global scope will have an unexpected influence on other tests.
# So this should be overridden here and be removed in `teardown`.
def FailObject.class
raise "FailObject error for tests in ObjectSpaceInputTest."
end
end

def teardown
FailObject.singleton_class.remove_method(:class)
end

TESTCONFIG = %[
Expand Down
43 changes: 43 additions & 0 deletions test/test_supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,49 @@ def server.config
$log.out.reset if $log && $log.out && $log.out.respond_to?(:reset)
end

data("Normal", {raw_path: "C:\\Windows\\Temp\\sigdump.log", expected: "C:\\Windows\\Temp\\sigdump-#{$$}.log"})
data("UNIX style", {raw_path: "/Windows/Temp/sigdump.log", expected: "/Windows/Temp/sigdump-#{$$}.log"})
data("No extension", {raw_path: "C:\\Windows\\Temp\\sigdump", expected: "C:\\Windows\\Temp\\sigdump-#{$$}"})
data("Multi-extension", {raw_path: "C:\\Windows\\Temp\\sig.dump.bk", expected: "C:\\Windows\\Temp\\sig.dump-#{$$}.bk"})
def test_fluentsigdump_get_path_with_pid(data)
p data
path = Fluent::FluentSigdump.get_path_with_pid(data[:raw_path])
assert_equal(data[:expected], path)
end

def test_supervisor_event_dump_windows
omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows?

server = DummyServer.new
def server.config
{:signame => "TestFluentdEvent"}
end
server.install_windows_event_handler

assert_rr do
# Have to use mock because `Sigdump.dump` seems to be somehow incompatible with RR.
# The `mock(server).restart(true) { nil }` line in `test_rpc_server_windows` cause the next error.
# Failure: test_supervisor_event_dump_windows(SupervisorTest):
# class()
# Called 0 times.
# Expected 1 times.
# .../Ruby26-x64/lib/ruby/gems/2.6.0/gems/sigdump-0.2.4/lib/sigdump.rb:74:in `block in dump_object_count'
# 73: ObjectSpace.each_object {|o|
# 74: c = o.class <-- HERE!
mock(Sigdump).dump(anything)

begin
sleep 0.1 # Wait for starting windows event thread
event = Win32::Event.open("TestFluentdEvent_CONT")
event.set
event.close
sleep 1.0 # Wait for dumping
ensure
server.stop_windows_event_thread
end
end
end

data(:ipv4 => ["0.0.0.0", "127.0.0.1", false],
:ipv6 => ["[::]", "[::1]", true],
:localhost_ipv4 => ["localhost", "127.0.0.1", false])
Expand Down