diff --git a/.gitignore b/.gitignore index 7c5f4a570f..c5c0a791b5 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ test/config/tmp/* make_dist.sh Gemfile.local .ruby-version +*.swp coverage/* diff --git a/appveyor.yml b/appveyor.yml new file mode 100644 index 0000000000..3486611a0c --- /dev/null +++ b/appveyor.yml @@ -0,0 +1,25 @@ +version: '{build}' + +install: + - SET PATH=C:\Ruby%ruby_version%\bin;%PATH% + - "%devkit%\\devkitvars.bat" + - ruby --version + - gem --version + - bundle install +build: off +test_script: + - bundle exec rake test TESTOPTS=-v + +environment: + matrix: + - ruby_version: "22-x64" + devkit: C:\Ruby21-x64\DevKit + - ruby_version: "22" + devkit: C:\Ruby21\DevKit + - ruby_version: "21-x64" + devkit: C:\Ruby21-x64\DevKit + - ruby_version: "21" + devkit: C:\Ruby21\DevKit +matrix: + allow_failures: + - ruby_version: "21" diff --git a/fluentd.gemspec b/fluentd.gemspec index 9680a073ac..10f1775d90 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -22,19 +22,25 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("msgpack", [">= 0.5.11", "< 0.6.0"]) gem.add_runtime_dependency("json", [">= 1.4.3"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) - gem.add_runtime_dependency("cool.io", [">= 1.2.2", "< 2.0.0"]) + gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"]) gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"]) gem.add_runtime_dependency("sigdump", ["~> 0.2.2"]) gem.add_runtime_dependency("tzinfo", [">= 1.0.0"]) gem.add_runtime_dependency("tzinfo-data", [">= 1.0.0"]) gem.add_runtime_dependency("string-scrub", [">= 0.0.3"]) + if /mswin|mingw/ =~ RUBY_PLATFORM + gem.add_runtime_dependency("win32-service", ["~> 0.8.3"]) + gem.add_runtime_dependency("win32-ipc", ["~> 0.6.1"]) + gem.add_runtime_dependency("win32-event", ["~> 0.6.1"]) + gem.add_runtime_dependency("windows-pr", ["~> 1.2.3"]) + end gem.add_development_dependency("rake", [">= 0.9.2"]) - gem.add_development_dependency("flexmock") + gem.add_development_dependency("flexmock", ["~> 1.3.3"]) gem.add_development_dependency("parallel_tests", [">= 0.15.3"]) gem.add_development_dependency("simplecov", ["~> 0.6.4"]) gem.add_development_dependency("rr", [">= 1.0.0"]) gem.add_development_dependency("timecop", [">= 0.3.0"]) - gem.add_development_dependency("test-unit", ["~> 3.0.2"]) + gem.add_development_dependency("test-unit", ["~> 3.1.4"]) gem.add_development_dependency("test-unit-rr", ["~> 1.0.3"]) end diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index 47d0d45130..e95bc10599 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -16,6 +16,13 @@ require 'optparse' require 'fluent/supervisor' +require 'fluent/log' +require 'fluent/env' +require 'fluent/version' +require 'windows/library' +include Windows::Library + +$fluentdargv = Marshal.load(Marshal.dump(ARGV)) op = OptionParser.new op.version = Fluent::VERSION @@ -34,6 +41,10 @@ opts[:dry_run] = b } +op.on('--show-plugin-config=PLUGIN', "Show PLUGIN configuration and exit(ex: input:dummy)") {|plugin| + opts[:show_plugin_config] = plugin +} + op.on('-p', '--plugin DIR', "add plugin directory") {|s| opts[:plugin_dirs] << s } @@ -115,6 +126,21 @@ opts[:gem_install_path] = s } +if Fluent.windows? + op.on('-x', '--signame INTSIGNAME', "an object name which is used for Windows Service signal (Windows only)") {|s| + opts[:signame] = s + } + + op.on('--reg-winsvc MODE', "install/uninstall as Windows Service. (i: install, u: uninstall) (Windows only)") {|s| + opts[:regwinsvc] = s + } + + op.on('--reg-winsvc-fluentdopt OPTION', "specify fluentd option paramters for Windows Service. (Windows only)") {|s| + opts[:fluentdopt] = s + } +end + + (class< FLUENTD_WINSVC_NAME, + :host => nil, + :service_type => Service::WIN32_OWN_PROCESS, + :description => FLUENTD_WINSVC_DESC, + :start_type => Service::DEMAND_START, + :error_control => Service::ERROR_NORMAL, + :binary_path_name => ruby_path+" -C "+binary_path+" winsvc.rb", + :load_order_group => "", + :dependencies => [""], + :display_name => FLUENTD_WINSVC_DISPLAYNAME + ) + when 'u' + Service.delete(FLUENTD_WINSVC_NAME) + else + # none + end + exit 0 +end + +if fluentdopt = opts[:fluentdopt] + Win32::Registry::HKEY_LOCAL_MACHINE.open("SYSTEM\\CurrentControlSet\\Services\\fluentdwinsvc", Win32::Registry::KEY_ALL_ACCESS) do |reg| + reg['fluentdopt', Win32::Registry::REG_SZ] = fluentdopt + end + exit 0 +end + +require 'fluent/supervisor' Fluent::Supervisor.new(opts).start diff --git a/lib/fluent/config/configure_proxy.rb b/lib/fluent/config/configure_proxy.rb index ada36cb79c..c91b87b967 100644 --- a/lib/fluent/config/configure_proxy.rb +++ b/lib/fluent/config/configure_proxy.rb @@ -17,7 +17,7 @@ module Fluent module Config class ConfigureProxy - attr_accessor :name, :final, :param_name, :required, :multi, :alias, :argument, :params, :defaults, :sections + attr_accessor :name, :final, :param_name, :required, :multi, :alias, :argument, :params, :defaults, :descriptions, :sections # config_param :desc, :string, :default => '....' # config_set_default :buffer_type, :memory # @@ -48,6 +48,7 @@ def initialize(name, opts = {}) @argument = nil # nil: ignore argument @params = {} @defaults = {} + @descriptions = {} @sections = {} end @@ -157,6 +158,10 @@ def parameter_configuration(name, *args, &block) config_set_default(name, opts[:default]) end + if opts.has_key?(:desc) + config_set_desc(name, opts[:desc]) + end + [name, block, opts] end @@ -189,6 +194,17 @@ def config_set_default(name, defval) nil end + def config_set_desc(name, description) + name = name.to_sym + + if @descriptions.has_key?(name) + raise ArgumentError, "#{self.name}: description specified twice for #{name}" + end + + @descriptions[name] = description + nil + end + def config_section(name, *args, &block) unless block_given? raise ArgumentError, "#{self.name}: config_section requires block parameter" @@ -208,6 +224,20 @@ def config_section(name, *args, &block) name end + + def dump(level = 0) + dumped_config = "\n" + indent = " " * level + @params.each do |name, config| + dumped_config << "#{indent}#{name}: #{config[1][:type]}: <#{@defaults[name].inspect}>" + dumped_config << " # #{@descriptions[name]}" if @descriptions[name] + dumped_config << "\n" + end + @sections.each do |section_name, sub_proxy| + dumped_config << "#{indent}#{section_name}#{sub_proxy.dump(level + 1)}" + end + dumped_config + end end end end diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index 24751ae010..24b1fbff60 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -121,6 +121,10 @@ def merged_configure_proxy # p AnyGreatClass.dup.name #=> nil configurables.map{ |a| a.configure_proxy(a.name || a.object_id.to_s) }.reduce(:merge) end + + def dump + configure_proxy_map[self.to_s].dump + end end end diff --git a/lib/fluent/env.rb b/lib/fluent/env.rb index 2916ff669d..8f20db8906 100644 --- a/lib/fluent/env.rb +++ b/lib/fluent/env.rb @@ -21,4 +21,10 @@ module Fluent DEFAULT_LISTEN_PORT = 24224 DEFAULT_FILE_PERMISSION = 0644 DEFAULT_DIR_PERMISSION = 0755 + IS_WINDOWS = /mswin|mingw/ === RUBY_PLATFORM + private_constant :IS_WINDOWS + + def self.windows? + IS_WINDOWS + end end diff --git a/lib/fluent/load.rb b/lib/fluent/load.rb index c1f408cc5c..1a0d064443 100644 --- a/lib/fluent/load.rb +++ b/lib/fluent/load.rb @@ -15,6 +15,7 @@ # ignore setup error on Win or similar platform which doesn't support signal end require 'cool.io' + require 'fluent/env' require 'fluent/version' require 'fluent/log' diff --git a/lib/fluent/output.rb b/lib/fluent/output.rb index e6ad6180e8..652d711b49 100644 --- a/lib/fluent/output.rb +++ b/lib/fluent/output.rb @@ -91,6 +91,8 @@ def secondary_init(primary) $log.warn "type of secondary output should be same as primary output", :primary=>primary.class.to_s, :secondary=>self.class.to_s end end + + def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__<<1)] end end diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 868fa1060d..bce9bc41e9 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -21,6 +21,7 @@ def initialize(key, path, unique_id, mode="a+", symlink_path = nil) @path = path @unique_id = unique_id @file = File.open(@path, mode, DEFAULT_FILE_PERMISSION) + @file.binmode @file.sync = true @size = @file.stat.size FileUtils.ln_sf(@path, symlink_path) if symlink_path @@ -65,8 +66,20 @@ def open(&block) end def mv(path) - File.rename(@path, path) - @path = path + if Fluent.windows? + pos = @file.pos + @file.close + File.rename(@path, path) + @path = path + @file = File.open(@path, 'rb', DEFAULT_FILE_PERMISSION) + @file.sync = true + @size = @file.size + @file.pos = pos + @size = @file.stat.size + else + File.rename(@path, path) + @path = path + end end end diff --git a/lib/fluent/plugin/file_wrapper.rb b/lib/fluent/plugin/file_wrapper.rb new file mode 100644 index 0000000000..b6ecd41cec --- /dev/null +++ b/lib/fluent/plugin/file_wrapper.rb @@ -0,0 +1,120 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module FileWrapper + def self.open(*args) + io = WindowsFile.new(*args).io + if block_given? + v = yield io + io.close + v + else + io + end + end + + def self.stat(path) + f = WindowsFile.new(path) + s = f.stat + f.close + s + end + end + + module WindowsFileExtension + attr_reader :path + + def stat + s = super + s.instance_variable_set :@ino, @ino + def s.ino; @ino; end + s + end + end + + # To open and get stat with setting FILE_SHARE_DELETE + class WindowsFile + require 'windows/file' + require 'windows/error' + require 'windows/handle' + require 'windows/nio' + + include Windows::Error + include Windows::File + include Windows::Handle + include Windows::NIO + + def initialize(path, mode='r', sharemode=FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE) + @path = path + @file_handle = INVALID_HANDLE_VALUE + @mode = mode + + + access, creationdisposition, seektoend = case mode.delete('b') + when "r" ; [FILE_GENERIC_READ , OPEN_EXISTING, false] + when "r+"; [FILE_GENERIC_READ | FILE_GENERIC_WRITE, OPEN_ALWAYS , false] + when "w" ; [FILE_GENERIC_WRITE , CREATE_ALWAYS, false] + when "w+"; [FILE_GENERIC_READ | FILE_GENERIC_WRITE, CREATE_ALWAYS, false] + when "a" ; [FILE_GENERIC_WRITE , OPEN_ALWAYS , true] + when "a+"; [FILE_GENERIC_READ | FILE_GENERIC_WRITE, OPEN_ALWAYS , true] + else raise "unknown mode '#{mode}'" + end + + @file_handle = CreateFile.call(@path, access, sharemode, + 0, creationdisposition, FILE_ATTRIBUTE_NORMAL, 0) + if @file_handle == INVALID_HANDLE_VALUE + err = GetLastError.call + if err == ERROR_FILE_NOT_FOUND || err == ERROR_ACCESS_DENIED + raise SystemCallError.new(2) + end + raise SystemCallError.new(err) + end + end + + def close + CloseHandle.call(@file_handle) + @file_handle = INVALID_HANDLE_VALUE + end + + def io + fd = _open_osfhandle(@file_handle, 0) + raise Errno::ENOENT if fd == -1 + io = File.for_fd(fd, @mode) + io.instance_variable_set :@ino, self.ino + io.instance_variable_set :@path, @path + io.extend WindowsFileExtension + io + end + + def ino + by_handle_file_information = '\0'*(4+8+8+8+4+4+4+4+4+4) #72bytes + + unless GetFileInformationByHandle.call(@file_handle, by_handle_file_information) + return 0 + end + + by_handle_file_information.unpack("I11Q1")[11] # fileindex + end + + def stat + s = File.stat(@path) + s.instance_variable_set :@ino, self.ino + def s.ino; @ino; end + s + end + end +end if Fluent.windows? diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 2270103519..5c403660d4 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -14,6 +14,12 @@ # limitations under the License. # +if Fluent.windows? + require_relative 'file_wrapper' +else + Fluent::FileWrapper = File +end + module Fluent class NewTailInput < Input Plugin.register_input('tail', self) @@ -76,7 +82,7 @@ def configure_tag def start if @pos_file - @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION) + @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, DEFAULT_FILE_PERMISSION) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) end @@ -144,7 +150,7 @@ def start_watchers(paths) pe = @pf[path] if @read_from_head && pe.read_inode.zero? begin - pe.update(File::Stat.new(path).ino, 0) + pe.update(FileWrapper.stat(path).ino, 0) rescue Errno::ENOENT $log.warn "#{path} not found. Continuing without tailing it." end @@ -494,9 +500,9 @@ def on_notify begin while true if @buffer.empty? - @io.read_nonblock(2048, @buffer) + @io.readpartial(2048, @buffer) else - @buffer << @io.read_nonblock(2048, @iobuf) + @buffer << @io.readpartial(2048, @iobuf) end while line = @buffer.slice!(/.*?\n/m) lines << line @@ -552,7 +558,7 @@ def initialize(path, log, &on_rotate) def on_notify begin - io = File.open(@path) + io = FileWrapper.open(@path) stat = io.stat inode = stat.ino fsize = stat.size @@ -568,7 +574,6 @@ def on_notify @on_rotate.call(io) io = nil end - @inode = inode @fsize = fsize ensure @@ -743,7 +748,7 @@ def configure_parser(conf) def start if @pos_file - @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION) + @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, DEFAULT_FILE_PERMISSION) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) end @@ -1017,9 +1022,9 @@ def on_notify begin while true if @buffer.empty? - @io.read_nonblock(2048, @buffer) + @io.readpartial(2048, @buffer) else - @buffer << @io.read_nonblock(2048, @iobuf) + @buffer << @io.readpartial(2048, @iobuf) end while line = @buffer.slice!(/.*?\n/m) lines << line @@ -1078,7 +1083,7 @@ def initialize(path, &on_rotate) def on_notify begin - io = File.open(@path) + io = FileWrapper.open(@path) stat = io.stat inode = stat.ino fsize = stat.size diff --git a/lib/fluent/plugin/out_exec_filter.rb b/lib/fluent/plugin/out_exec_filter.rb index 971090c9aa..4646e4a702 100644 --- a/lib/fluent/plugin/out_exec_filter.rb +++ b/lib/fluent/plugin/out_exec_filter.rb @@ -262,7 +262,8 @@ def start(command) def kill_child(join_wait) begin - Process.kill(:TERM, @pid) + signal = Fluent.windows? ? :KILL : :TERM + Process.kill(signal, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM # Errno::ESRCH 'No such process', ignore # child process killed by signal chained from fluentd process diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 37b5718ecd..2d08336704 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -83,11 +83,11 @@ def write(chunk) case @compress when nil - File.open(path, "a", DEFAULT_FILE_PERMISSION) {|f| + File.open(path, "ab", DEFAULT_FILE_PERMISSION) {|f| chunk.write_to(f) } when :gz - File.open(path, "a", DEFAULT_FILE_PERMISSION) {|f| + File.open(path, "ab", DEFAULT_FILE_PERMISSION) {|f| gz = Zlib::GzipWriter.new(f) chunk.write_to(gz) gz.close diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index b5e3976684..4f13275bef 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -269,6 +269,7 @@ def send_heartbeat_tcp(node) #sock.write FORWARD_TCP_HEARTBEAT_DATA node.heartbeat(true) ensure + sock.close_write sock.close end end @@ -347,6 +348,7 @@ def send_data(node, tag, chunk) node.heartbeat(false) return res # for test ensure + sock.close_write sock.close end end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index dffd5afbd9..e26cd91d7f 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -16,6 +16,14 @@ require 'fluent/load' require 'etc' +if Fluent.windows? + require 'windows/library' + require 'windows/system_info' + include Windows::Library + include Windows::SystemInfo + require 'win32/ipc' + require 'win32/event' +end module Fluent class Supervisor @@ -94,6 +102,8 @@ def self.default_options :without_source => false, :use_v1_config => true, :supervise => true, + :signame => nil, + :winsvcreg => nil, } end @@ -105,6 +115,7 @@ def initialize(opt) @use_v1_config = opt[:use_v1_config] @log_path = opt[:log_path] @dry_run = opt[:dry_run] + @show_plugin_config = opt[:show_plugin_config] @libs = opt[:libs] @plugin_dirs = opt[:plugin_dirs] @chgroup = opt[:chgroup] @@ -115,7 +126,15 @@ def initialize(opt) @suppress_interval = opt[:suppress_interval] @suppress_config_dump = opt[:suppress_config_dump] @without_source = opt[:without_source] + @signame = opt[:signame] + if Fluent.windows? + ruby_path = "\0" * 256 + GetModuleFileName.call(0,ruby_path,256) + ruby_path = ruby_path.rstrip.gsub(/\\/, '/') + @rubybin_dir = ruby_path[0, ruby_path.rindex("/")] + @winosvi = windows_version + end log_opts = {:suppress_repeated_stacktrace => opt[:suppress_repeated_stacktrace]} @log = LoggerInitializer.new(@log_path, @log_level, @chuser, @chgroup, log_opts) @finished = false @@ -124,6 +143,7 @@ def initialize(opt) def start @log.init + show_plugin_config if @show_plugin_config read_config apply_system_config @@ -152,6 +172,7 @@ def start change_privilege init_engine install_main_process_signal_handlers + install_main_process_winsigint_handler if Fluent.windows? run_configure finish_daemonize if @daemonize run_engine @@ -185,6 +206,17 @@ def dry_run exit 1 end + def show_plugin_config + $log.info "Show config for #{@show_plugin_config}" + name, type = @show_plugin_config.split(":") + plugin = Plugin.__send__("new_#{name}", type) + $log.info plugin.class.dump + exit 0 + rescue => e + $log.error "show config failed: #{e}" + exit 1 + end + def start_daemonize @wait_daemonize_pipe_r, @wait_daemonize_pipe_w = IO.pipe @@ -277,8 +309,23 @@ def supervise(&block) start_time = Time.now $log.info "starting fluentd-#{Fluent::VERSION}" - @main_pid = fork do - main_process(&block) + + if !Fluent.windows? + @main_pid = fork do + main_process(&block) + end + else + if @supervise + fluentd_spawn_cmd = @rubybin_dir+"/ruby.exe '"+@rubybin_dir+"/fluentd' " + $fluentdargv.each{|a| + fluentd_spawn_cmd << (a + " ") + } + fluentd_spawn_cmd << ("--no-supervisor") + $log.info "spawn command to main (windows) : " + fluentd_spawn_cmd + @main_pid = Process.spawn(fluentd_spawn_cmd) + else + main_process(&block) + end end if @daemonize && @wait_daemonize_pipe_w @@ -293,6 +340,11 @@ def supervise(&block) @main_pid = nil ecode = $?.to_i + if Fluent.windows? + @th_sv.kill if @th_sv.alive? + @th_sv.join rescue nil + end + $log.info "process finished", :code=>ecode if !@finished && Time.now - start_time < 1 @@ -304,6 +356,9 @@ def supervise(&block) def main_process(&block) begin block.call + if Fluent.windows? + @th_ma.join + end rescue Fluent::ConfigError $log.error "config error", :file=>@config_path, :error=>$!.to_s @@ -328,6 +383,8 @@ def main_process(&block) end def install_supervisor_signal_handlers + install_supervisor_winsigint_handler + trap :INT do $log.debug "fluentd supervisor process get SIGINT" supervisor_sigint_handler @@ -342,25 +399,33 @@ def install_supervisor_signal_handlers $log.debug "fluentd supervisor process get SIGHUP" $log.info "restarting" supervisor_sighup_handler - end + end unless Fluent.windows? trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler - end + end unless Fluent.windows? end def supervisor_sigint_handler @finished = true - if pid = @main_pid - # kill processes only still exists - unless Process.waitpid(pid, Process::WNOHANG) - begin - Process.kill(:INT, pid) - rescue Errno::ESRCH - # ignore processes already died + unless Fluent.windows? + if pid = @main_pid + # kill processes only still exists + unless Process.waitpid(pid, Process::WNOHANG) + begin + Process.kill(:INT, pid) + rescue Errno::ESRCH + # ignore processes already died + end end end + else + begin + @evtend.set + rescue + # nothing to do. + end end end @@ -496,10 +561,18 @@ def install_main_process_signal_handlers trap :INT do $log.debug "fluentd main process get SIGINT" - unless @finished - @finished = true - $log.debug "getting start to shutdown main process" - Fluent::Engine.stop + unless Fluent.windows? + unless @finished + @finished = true + $log.debug "getting start to shutdown main process" + Fluent::Engine.stop + end + else + begin + @evtend.set + rescue + # nothing to do. + end end end @@ -515,7 +588,7 @@ def install_main_process_signal_handlers trap :HUP do # TODO $log.debug "fluentd main process get SIGHUP" - end + end unless Fluent.windows? trap :USR1 do $log.debug "fluentd main process get SIGUSR1" @@ -532,11 +605,52 @@ def install_main_process_signal_handlers $log.warn "flushing thread error: #{e}" end }.run - end + end unless Fluent.windows? end def run_engine Fluent::Engine.run end + + def install_supervisor_winsigint_handler + @winintname = @signame || "fluentdwinsigint_#{Process.pid}" + @th_sv = Thread.new do + @evtend = Win32::Event.new(@winintname, true) + until @evtend.signaled? + sleep(1) + end + @evtend.close + + @finished = true + if pid = @main_pid + unless Process.waitpid(pid, Process::WNOHANG) + sigx = (@winosvi >= 6.2) ? (:INT) : (:KILL) + begin + Process.kill(sigx, pid) + rescue Errno::ESRCH + # ignore processes already died + end + end + end + end + end + + def install_main_process_winsigint_handler + @winintname = @signame || "fluentdwinsigint_#{Process.ppid}" + @th_ma = Thread.new do + @evtend = Win32::Event.open(@winintname) + until @evtend.signaled? + sleep(1) + end + @evtend.close + + unless @finished + @finished = true + $log.debug "getting start to shutdown main process" + Fluent::Engine.stop + end + end + $log.debug "install_main_process_winsigint_handler***** installed main winsiginthandler" + end end end diff --git a/lib/fluent/winsvc.rb b/lib/fluent/winsvc.rb new file mode 100644 index 0000000000..a092e33bc2 --- /dev/null +++ b/lib/fluent/winsvc.rb @@ -0,0 +1,56 @@ + +INTEVENTOBJ_NAME="fluentdwinsvc" + +begin + +require 'windows/debug' +require 'Windows/Library' +require 'win32/daemon' +require 'win32/event' + +include Win32 +include Windows::Library +include Windows::Debug + +def read_fluentdopt + require 'win32/Registry' + Win32::Registry::HKEY_LOCAL_MACHINE.open("SYSTEM\\CurrentControlSet\\Services\\fluentdwinsvc") do |reg| + reg.read("fluentdopt")[1] + end +end + +def service_main_start + ruby_path = 0.chr * 260 + GetModuleFileName.call(0, ruby_path,260) + ruby_path = ruby_path.rstrip.gsub(/\\/, '/') + rubybin_dir = ruby_path[0, ruby_path.rindex("/")] + opt = read_fluentdopt + Process.spawn (rubybin_dir+"/ruby.exe "+rubybin_dir+"/fluentd "+opt+" -x "+INTEVENTOBJ_NAME) +end + +class FluentdService < Daemon + @pid=0 + def service_main + opt = read_fluentdopt + @pid=service_main_start + while running? + sleep 10 + end + end + + def service_stop + ev = Win32::Event.open(INTEVENTOBJ_NAME) + ev.set + ev.close + if @pid > 0 + Porcess.waitpid(@pid) + end + end +end + +FluentdService.mainloop + +rescue Execption => err + raise +end + diff --git a/test/config/test_configure_proxy.rb b/test/config/test_configure_proxy.rb index db4869688b..18ff36911c 100644 --- a/test/config/test_configure_proxy.rb +++ b/test/config/test_configure_proxy.rb @@ -94,6 +94,128 @@ class TestConfigureProxy < ::Test::Unit::TestCase assert_raise(ArgumentError) { proxy.config_argument(:name, default: "name2") } end end + + sub_test_case '#config_set_desc' do + setup do + @proxy = Fluent::Config::ConfigureProxy.new(:section) + end + + test 'does not permit description specification twice w/ :desc option' do + @proxy.config_param(:name, :string, desc: "description") + assert_raise(ArgumentError) { @proxy.config_set_desc(:name, "description2") } + end + + test 'does not permit description specification twice' do + @proxy.config_param(:name, :string) + @proxy.config_set_desc(:name, "description") + assert_raise(ArgumentError) { @proxy.config_set_desc(:name, "description2") } + end + end + + sub_test_case '#dump' do + setup do + @proxy = Fluent::Config::ConfigureProxy.new(:section) + end + + test 'empty proxy' do + assert_equal("\n", @proxy.dump) + end + + test 'plain proxy w/o default value' do + @proxy.config_param(:name, :string) + assert_equal(< +CONFIG + end + + test 'plain proxy w/ default value' do + @proxy.config_param(:name, :string, default: "name1") + assert_equal(< +CONFIG + end + + test 'plain proxy w/ default value using config_set_default' do + @proxy.config_param(:name, :string) + @proxy.config_set_default(:name, "name1") + assert_equal(< +CONFIG + end + + test 'single sub proxy' do + @proxy.config_section(:sub) do + config_param(:name, :string, default: "name1") + end + assert_equal(< +CONFIG + end + + test 'nested sub proxy' do + @proxy.config_section(:sub) do + config_param(:name1, :string, default: "name1") + config_param(:name2, :string, default: "name2") + config_section(:sub2) do + config_param(:name3, :string, default: "name3") + config_param(:name4, :string, default: "name4") + end + end + assert_equal(< + name2: string: <"name2"> + sub2 + name3: string: <"name3"> + name4: string: <"name4"> +CONFIG + end + + sub_test_case 'w/ description' do + test 'single proxy' do + @proxy.config_param(:name, :string, desc: "description for name") + assert_equal(< # description for name +CONFIG + end + + test 'single proxy using config_set_desc' do + @proxy.config_param(:name, :string) + @proxy.config_set_desc(:name, "description for name") + assert_equal(< # description for name +CONFIG + end + + test 'sub proxy' do + @proxy.config_section(:sub) do + config_param(:name1, :string, default: "name1", desc: "desc1") + config_param(:name2, :string, default: "name2", desc: "desc2") + config_section(:sub2) do + config_param(:name3, :string, default: "name3") + config_param(:name4, :string, default: "name4", desc: "desc4") + end + end + assert_equal(< # desc1 + name2: string: <"name2"> # desc2 + sub2 + name3: string: <"name3"> + name4: string: <"name4"> # desc4 +CONFIG + end + end + end end end end diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 5b07897c5a..2da05d16a8 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -13,9 +13,7 @@ class FileBufferChunkTest < Test::Unit::TestCase BUF_FILE_TMPDIR = File.expand_path(File.join(File.dirname(__FILE__), '..', 'tmp', 'buf_file_chunk')) def setup - if Dir.exists? BUF_FILE_TMPDIR - FileUtils.remove_entry_secure BUF_FILE_TMPDIR - end + FileUtils.rm_rf(BUF_FILE_TMPDIR, secure: true) FileUtils.mkdir_p BUF_FILE_TMPDIR end @@ -28,6 +26,7 @@ def filebufferchunk(key, unique, opts={}) end def test_init + omit "Windows doesn't support symlink" if Fluent.windows? chunk = filebufferchunk('key', 'init1') assert_equal 'key', chunk.key assert_equal 'init1', chunk.unique_id @@ -228,9 +227,7 @@ class FileBufferTest < Test::Unit::TestCase BUF_FILE_TMPDIR = File.expand_path(File.join(File.dirname(__FILE__), '..', 'tmp', 'buf_file')) def setup - if Dir.exists? BUF_FILE_TMPDIR - FileUtils.remove_entry_secure BUF_FILE_TMPDIR - end + FileUtils.rm_rf(BUF_FILE_TMPDIR, secure: true) FileUtils.mkdir_p BUF_FILE_TMPDIR end diff --git a/test/plugin/test_in_stream.rb b/test/plugin/test_in_stream.rb index 0c89c806a7..0ac5a4ae4c 100644 --- a/test/plugin/test_in_stream.rb +++ b/test/plugin/test_in_stream.rb @@ -122,4 +122,4 @@ def test_configure def connect UNIXSocket.new("#{TMP_DIR}/unix") end -end +end unless Fluent.windows? diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 233b0fbcd5..34008b4aa2 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -8,10 +8,20 @@ class TailInputTest < Test::Unit::TestCase def setup Fluent::Test.setup - FileUtils.rm_rf(TMP_DIR) + FileUtils.rm_rf(TMP_DIR, secure: true) + if File.exist?(TMP_DIR) + # ensure files are closed for Windows, on which deleted files + # are still visible from filesystem + GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) + FileUtils.remove_entry_secure(TMP_DIR) + end FileUtils.mkdir_p(TMP_DIR) end + def teardown + Fluent::Engine.stop + end + TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" CONFIG = %[ @@ -46,7 +56,7 @@ def test_configure # TODO: Should using more better approach instead of sleep wait def test_emit - File.open("#{TMP_DIR}/tail.txt", "w") {|f| + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -56,7 +66,7 @@ def test_emit d.run do sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -79,7 +89,7 @@ def test_emit_with_read_lines_limit(data) d.run do sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts msg f.puts msg } @@ -94,7 +104,7 @@ def test_emit_with_read_lines_limit(data) end def test_emit_with_read_from_head - File.open("#{TMP_DIR}/tail.txt", "w") {|f| + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -104,7 +114,7 @@ def test_emit_with_read_from_head d.run do sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -141,13 +151,13 @@ def test_rotate_file_with_read_from_head def test_rotate_file_with_write_old emits = sub_test_rotate_file(SINGLE_LINE_CONFIG) { |rotated_file| - File.open("#{TMP_DIR}/tail.txt", "w") { |f| } + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } rotated_file.puts "test7" rotated_file.puts "test8" rotated_file.flush sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") { |f| + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -175,7 +185,7 @@ def test_rotate_file_with_write_old_and_no_new_file end def sub_test_rotate_file(config = nil) - file = File.open("#{TMP_DIR}/tail.txt", "w") + file = Fluent::FileWrapper.open("#{TMP_DIR}/tail.txt", "wb") file.puts "test1" file.puts "test2" file.flush @@ -195,10 +205,10 @@ def sub_test_rotate_file(config = nil) sleep 1 else sleep 1 - File.open("#{TMP_DIR}/tail.txt", "w") { |f| } + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") { |f| + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -212,21 +222,21 @@ def sub_test_rotate_file(config = nil) d.emits ensure - file.close + file.close if file end def test_lf - File.open("#{TMP_DIR}/tail.txt", "w") {|f| } + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| } d = create_driver d.run do - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.print "test3" } sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test4" } sleep 1 @@ -238,14 +248,14 @@ def test_lf end def test_whitespace - File.open("#{TMP_DIR}/tail.txt", "w") {|f| } + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| } d = create_driver d.run do sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts " " # 4 spaces f.puts " 4 spaces" f.puts "4 spaces " @@ -269,7 +279,7 @@ def test_whitespace # multiline mode test def test_multiline - File.open("#{TMP_DIR}/tail.txt", "w") { |f| } + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } d = create_driver %[ format multiline @@ -277,7 +287,7 @@ def test_multiline format_firstline /^[s]/ ] d.run do - File.open("#{TMP_DIR}/tail.txt", "a") { |f| + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -299,7 +309,7 @@ def test_multiline end def test_multiline_with_multiple_formats - File.open("#{TMP_DIR}/tail.txt", "w") { |f| } + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } d = create_driver %[ format multiline @@ -309,7 +319,7 @@ def test_multiline_with_multiple_formats format_firstline /^[s]/ ] d.run do - File.open("#{TMP_DIR}/tail.txt", "a") { |f| + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -332,7 +342,7 @@ def test_multiline_with_multiple_formats def test_multilinelog_with_multiple_paths files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] - files.each { |file| File.open(file, "w") { |f| } } + files.each { |file| File.open(file, "wb") { |f| } } d = create_driver(%[ path #{files[0]},#{files[1]} @@ -343,7 +353,7 @@ def test_multilinelog_with_multiple_paths ], false) d.run do files.each do |file| - File.open(file, 'a') { |f| + File.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -363,7 +373,7 @@ def test_multilinelog_with_multiple_paths end def test_multiline_without_firstline - File.open("#{TMP_DIR}/tail.txt", "w") { |f| } + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } d = create_driver %[ format multiline @@ -372,7 +382,7 @@ def test_multiline_without_firstline format3 /(?baz \\d)/ ] d.run do - File.open("#{TMP_DIR}/tail.txt", "a") { |f| + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -421,13 +431,13 @@ def test_expand_paths assert_equal EX_PATHS - [EX_PATHS.last], plugin.expand_paths.sort end - def test_refresh_watchers + def test_z_refresh_watchers plugin = create_driver(EX_CONFIG, false).instance sio = StringIO.new plugin.instance_eval do @pf = Fluent::NewTailInput::PositionFile.parse(sio) @loop = Coolio::Loop.new - end + end flexstub(Time) do |timeclass| timeclass.should_receive(:now).with_no_args.and_return(Time.new(2010, 1, 2, 3, 4, 5), Time.new(2010, 1, 2, 3, 4, 6), Time.new(2010, 1, 2, 3, 4, 7)) @@ -528,7 +538,7 @@ def test_receive_lines # Ensure that no fatal exception is raised when a file is missing and that # files that do exist are still tailed as expected. def test_missing_file - File.open("#{TMP_DIR}/tail.txt", "w") {|f| + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -547,7 +557,7 @@ def test_missing_file d = create_driver(config, false) d.run do sleep 1 - File.open("#{TMP_DIR}/tail.txt", "a") {|f| + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } diff --git a/test/plugin/test_out_exec.rb b/test/plugin/test_out_exec.rb index be5c4b0725..361eba63f8 100644 --- a/test/plugin/test_out_exec.rb +++ b/test/plugin/test_out_exec.rb @@ -5,7 +5,13 @@ class ExecOutputTest < Test::Unit::TestCase def setup Fluent::Test.setup - FileUtils.rm_rf(TMP_DIR) + FileUtils.rm_rf(TMP_DIR, secure: true) + if File.exist?(TMP_DIR) + # ensure files are closed for Windows, on which deleted files + # are still visible from filesystem + GC.start(full_mark: true, immediate_sweep: true) + FileUtils.remove_entry_secure(TMP_DIR) + end FileUtils.mkdir_p(TMP_DIR) end diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index 4915ef7c60..81b14cc5ee 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -25,7 +25,7 @@ def create_driver(conf = CONFIG, tag = 'test') def sed_unbuffered_support? @sed_unbuffered_support ||= lambda { - system("echo xxx | sed --unbuffered -l -e 's/x/y/g' >/dev/null 2>&1") + system("echo xxx | sed --unbuffered -l -e 's/x/y/g' >#{IO::NULL} 2>&1") $?.success? }.call end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index 4d10107845..daa50fc5da 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -62,7 +62,7 @@ def test_default_localtime d = create_driver(%[path #{TMP_DIR}/out_file_test]) time = Time.parse("2011-01-02 13:14:15 UTC").to_i - with_timezone('Asia/Taipei') do + with_timezone(Fluent.windows? ? 'NST-8' : 'Asia/Taipei') do d.emit({"a"=>1}, time) d.expect_format %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}\n] d.run @@ -121,7 +121,7 @@ def check_gzipped_result(path, expect) # Zlib::GzipReader has a bug of concatenated file: https://bugs.ruby-lang.org/issues/9790 # Following code from https://www.ruby-forum.com/topic/971591#979520 result = '' - File.open(path) { |io| + File.open(path, "rb") { |io| loop do gzr = Zlib::GzipReader.new(io) result << gzr.read @@ -232,6 +232,7 @@ def test_write_with_append end def test_write_with_symlink + omit "Windows doesn't support symlink" if Fluent.windows? conf = CONFIG + %[ symlink_path #{SYMLINK_PATH} ] diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index c3a4dc2076..edafd190e5 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -303,6 +303,7 @@ def initialize(sock, log, on_message) def write(data) @sock.write data rescue => e + @sock.close_write @sock.close end else @@ -312,6 +313,7 @@ def write(data) end def close + @sock.close_write @sock.close end } @@ -333,7 +335,10 @@ def close end sleep # wait for connection to be closed by client ensure - sock.close if sock + if sock + sock.close_write + sock.close + end end end end