From dfd4fc32ffa223c3bdb76f7d33bd4ee71a1bbeac Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 00:52:30 +0900 Subject: [PATCH 01/10] Add support for simple inter-worker locking This is the first cut at a simple locking mechanism that allows Fluentd workers to coordinate each other. Here is how it works: * ServerModule prepares a lockfile directory and share that directory path with its workers (through ENV). * Now, Workers can define critical sessions by creating a lockfile in that directory. * The actual lock was held by flock, so it's automatically freed by OS even when Worker gets killed in the middle of critical session. I'm going to use this mechanism to support concurrent appends in out_file, but should be useful for other plugins as well. Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/base.rb | 19 +++++++++++++++++++ lib/fluent/supervisor.rb | 9 +++++++++ 2 files changed, 28 insertions(+) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 601e216b81..40235bb5de 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -51,6 +51,12 @@ def fluentd_worker_id @_fluentd_worker_id end + def fluentd_lockdir + return @_fluentd_lockdir if @_fluentd_lockdir + @_fluentd_lockdir = ENV['FLUENTD_LOCKDIR'] + @_fluentd_lockdir + end + def configure(conf) if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?) workers = if conf.target_worker_ids && !conf.target_worker_ids.empty? @@ -70,6 +76,19 @@ def multi_workers_ready? true end + def acquire_worker_lock(name, &block) + if fluentd_lockdir.nil? + raise RuntimeError, "fail to create lockfile on '#{fluentd_lockdir}'" + end + + name = name.gsub(/[^a-zA-Z0-9]/, "_") + lockfile = "fluentd-#{name}.lock" + File.open(File.join(fluentd_lockdir, lockfile), "w") do |f| + f.flock(File::LOCK_EX) + block.call() + end + end + def string_safe_encoding(str) unless str.valid_encoding? str = str.scrub('?') diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 0bd82ece47..cab076c50c 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -50,6 +50,9 @@ def before_run @rpc_server = nil @counter = nil + @fluentd_lockdir = Dir.mktmpdir("fluentd-lock-") + ENV['FLUENTD_LOCKDIR'] = @fluentd_lockdir + if config[:rpc_endpoint] @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] @@ -79,9 +82,15 @@ def after_run stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter + cleanup_lockdir Fluent::Supervisor.cleanup_resources end + def cleanup_lockdir + FileUtils.rm(Dir.glob(File.join(@fluentd_lockdir, "fluentd-*.lock"))) + FileUtils.rmdir(@fluentd_lockdir) + end + def run_rpc_server @rpc_server = RPC::Server.new(@rpc_endpoint, $log) From 6a18e4da768256178d90370a546342af1f5ac4f2 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 00:54:55 +0900 Subject: [PATCH 02/10] out_file: Add a simple lock to support concurrent appends Previously, out_file did not support any locking on append mode. With this, out_file will acquire a worker lock when appending to a file, which makes the following configuration just works: workers 2 @type file append true ... Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/out_file.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index f09c149eb1..7de4b90979 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -217,7 +217,13 @@ def write(chunk) end if @append - writer.call(path, chunk) + if @need_lock + acquire_worker_lock(path) do + writer.call(path, chunk) + end + else + writer.call(path, chunk) + end else find_filepath_available(path, with_lock: @need_lock) do |actual_path| writer.call(actual_path, chunk) From a538b9bcc6561aac500823f1b7332edae8c2d0cb Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 11:48:05 +0900 Subject: [PATCH 03/10] Apply feedback from kou on GitHub#3808 Signed-off-by: Fujimoto Seiji --- lib/fluent/error.rb | 3 +++ lib/fluent/plugin/base.rb | 8 +++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/fluent/error.rb b/lib/fluent/error.rb index 564b1c8d63..251094c820 100644 --- a/lib/fluent/error.rb +++ b/lib/fluent/error.rb @@ -28,6 +28,9 @@ def to_s class InvalidRootDirectory < UnrecoverableError end + class InvalidLockDirectory < UnrecoverableError + end + # For internal use class UncatchableError < Exception end diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 40235bb5de..85dcd65254 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -52,9 +52,7 @@ def fluentd_worker_id end def fluentd_lockdir - return @_fluentd_lockdir if @_fluentd_lockdir - @_fluentd_lockdir = ENV['FLUENTD_LOCKDIR'] - @_fluentd_lockdir + @_fluentd_lockdir ||= ENV['FLUENTD_LOCKDIR'] end def configure(conf) @@ -78,14 +76,14 @@ def multi_workers_ready? def acquire_worker_lock(name, &block) if fluentd_lockdir.nil? - raise RuntimeError, "fail to create lockfile on '#{fluentd_lockdir}'" + raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set" end name = name.gsub(/[^a-zA-Z0-9]/, "_") lockfile = "fluentd-#{name}.lock" File.open(File.join(fluentd_lockdir, lockfile), "w") do |f| f.flock(File::LOCK_EX) - block.call() + yield end end From 75ef92f787106de0a054174c2423a7f2fee00350 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 11:50:57 +0900 Subject: [PATCH 04/10] Use automatic cleanup with FLUENTD_LOCKDIR This is a lot safer than doing a manual cleanup, and it also allows to clean up directories in a more rubust manner. Signed-off-by: Fujimoto Seiji --- lib/fluent/supervisor.rb | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index cab076c50c..079eb4709b 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -50,9 +50,6 @@ def before_run @rpc_server = nil @counter = nil - @fluentd_lockdir = Dir.mktmpdir("fluentd-lock-") - ENV['FLUENTD_LOCKDIR'] = @fluentd_lockdir - if config[:rpc_endpoint] @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] @@ -82,15 +79,9 @@ def after_run stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter - cleanup_lockdir Fluent::Supervisor.cleanup_resources end - def cleanup_lockdir - FileUtils.rm(Dir.glob(File.join(@fluentd_lockdir, "fluentd-*.lock"))) - FileUtils.rmdir(@fluentd_lockdir) - end - def run_rpc_server @rpc_server = RPC::Server.new(@rpc_endpoint, $log) @@ -884,7 +875,11 @@ def supervise se = ServerEngine.create(ServerModule, WorkerModule){ Fluent::Supervisor.load_config(@config_path, params) } - se.run + + Dir.mktmpdir("fluentd-lock-") do |lockdir| + ENV['FLUENTD_LOCKDIR'] = lockdir + se.run + end end def install_main_process_signal_handlers From 2ebce289954fc9d0e208672b7ef8f1415888d7bf Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 12:31:31 +0900 Subject: [PATCH 05/10] Raise error on configuration if lockdir is unavailable Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/out_file.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 7de4b90979..7bf25a1b6e 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -188,6 +188,10 @@ def configure(conf) condition = Gem::Dependency.new('', [">= 2.7.0", "< 3.1.0"]) @need_ruby_on_macos_workaround = true if condition.match?('', RUBY_VERSION) end + + if @need_lock && @append && fluentd_lockdir.nil? + raise InvalidLockDirectory, "must set FLUENTD_LOCKDIR on multi-worker append mode" + end end def multi_workers_ready? From 0a7a590578e7d59de75f841e18b82494e7390e30 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 12:53:18 +0900 Subject: [PATCH 06/10] Apply feedback from ashie on GitHub#3808 Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/base.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 85dcd65254..28485dcbd5 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -34,6 +34,7 @@ def initialize @_state = State.new(false, false, false, false, false, false, false, false, false) @_context_router = nil @_fluentd_worker_id = nil + @_fluentd_lockdir = nil @under_plugin_development = false end @@ -74,7 +75,7 @@ def multi_workers_ready? true end - def acquire_worker_lock(name, &block) + def acquire_worker_lock(name) if fluentd_lockdir.nil? raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set" end From d31631ec088c74cf7f190ed2639658394086e2c9 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 14:26:55 +0900 Subject: [PATCH 07/10] Add testcase for worker lock functions Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/base.rb | 10 ++++++---- test/plugin/test_base.rb | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 28485dcbd5..185d7ec5e5 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -75,14 +75,16 @@ def multi_workers_ready? true end + def worker_lockfile(name) + name = name.gsub(/[^a-zA-Z0-9]/, "_") + File.join(fluentd_lockdir, "fluentd-#{name}.lock") + end + def acquire_worker_lock(name) if fluentd_lockdir.nil? raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set" end - - name = name.gsub(/[^a-zA-Z0-9]/, "_") - lockfile = "fluentd-#{name}.lock" - File.open(File.join(fluentd_lockdir, lockfile), "w") do |f| + File.open(worker_lockfile(name), "w") do |f| f.flock(File::LOCK_EX) yield end diff --git a/test/plugin/test_base.rb b/test/plugin/test_base.rb index 51d55c1b69..307466172b 100644 --- a/test/plugin/test_base.rb +++ b/test/plugin/test_base.rb @@ -1,4 +1,5 @@ require_relative '../helper' +require 'tmpdir' require 'fluent/plugin/base' module FluentPluginBaseTest @@ -112,4 +113,25 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase assert_equal 1, logger.logs.size assert{ logger.logs.first.include?("invalid byte sequence is replaced in ") } end + + test 'acquire worker lock' do + Dir.mktmpdir("test-fluentd-lock-") do |lockdir| + ENV['FLUENTD_LOCKDIR'] = lockdir + lockfile = @p.worker_lockfile("worker_test") + + @p.acquire_worker_lock("worker_test") do + # With LOCK_NB set, flock() returns `false` when the + # file is already locked. + File.open(lockfile, "w") do |f| + assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + + # Lock should be release by now. In that case, flock + # must return 0. + File.open(lockfile, "w") do |f| + assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + end end From d9c793e3716893a2c9de54caef6b81038ef7b7fe Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 15:26:03 +0900 Subject: [PATCH 08/10] Apply feeback from kou on GitHub#3808 Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/base.rb | 14 +++++--------- lib/fluent/supervisor.rb | 4 ++-- test/plugin/test_base.rb | 13 +++++++------ 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 185d7ec5e5..699e2a32eb 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -34,7 +34,7 @@ def initialize @_state = State.new(false, false, false, false, false, false, false, false, false) @_context_router = nil @_fluentd_worker_id = nil - @_fluentd_lockdir = nil + @_fluentd_lock_dir = ENV['FLUENTD_LOCK_DIR'] @under_plugin_development = false end @@ -52,10 +52,6 @@ def fluentd_worker_id @_fluentd_worker_id end - def fluentd_lockdir - @_fluentd_lockdir ||= ENV['FLUENTD_LOCKDIR'] - end - def configure(conf) if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?) workers = if conf.target_worker_ids && !conf.target_worker_ids.empty? @@ -75,16 +71,16 @@ def multi_workers_ready? true end - def worker_lockfile(name) + def get_lock_path(name) name = name.gsub(/[^a-zA-Z0-9]/, "_") - File.join(fluentd_lockdir, "fluentd-#{name}.lock") + File.join(@_fluentd_lock_dir, "fluentd-#{name}.lock") end def acquire_worker_lock(name) - if fluentd_lockdir.nil? + if @_fluentd_lock_dir.nil? raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set" end - File.open(worker_lockfile(name), "w") do |f| + File.open(get_lock_path(name), "w") do |f| f.flock(File::LOCK_EX) yield end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 079eb4709b..854e53cb12 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -876,8 +876,8 @@ def supervise Fluent::Supervisor.load_config(@config_path, params) } - Dir.mktmpdir("fluentd-lock-") do |lockdir| - ENV['FLUENTD_LOCKDIR'] = lockdir + Dir.mktmpdir("fluentd-lock-") do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir se.run end end diff --git a/test/plugin/test_base.rb b/test/plugin/test_base.rb index 307466172b..32f547a467 100644 --- a/test/plugin/test_base.rb +++ b/test/plugin/test_base.rb @@ -115,21 +115,22 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase end test 'acquire worker lock' do - Dir.mktmpdir("test-fluentd-lock-") do |lockdir| - ENV['FLUENTD_LOCKDIR'] = lockdir - lockfile = @p.worker_lockfile("worker_test") + Dir.mktmpdir("test-fluentd-lock-") do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + p = FluentPluginBaseTest::DummyPlugin.new + lock_path = p.get_lock_path("test_base") - @p.acquire_worker_lock("worker_test") do + p.acquire_worker_lock("test_base") do # With LOCK_NB set, flock() returns `false` when the # file is already locked. - File.open(lockfile, "w") do |f| + File.open(lock_path, "w") do |f| assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB) end end # Lock should be release by now. In that case, flock # must return 0. - File.open(lockfile, "w") do |f| + File.open(lock_path, "w") do |f| assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB) end end From f3838cb5a1058e49ae59a4ea44743a7162e7d430 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 15:34:36 +0900 Subject: [PATCH 09/10] Fix variable name Signed-off-by: Fujimoto Seiji --- lib/fluent/plugin/base.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 699e2a32eb..1846273a17 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -31,10 +31,10 @@ class Base def initialize @log = nil super + @fluentd_lock_dir = ENV['FLUENTD_LOCK_DIR'] @_state = State.new(false, false, false, false, false, false, false, false, false) @_context_router = nil @_fluentd_worker_id = nil - @_fluentd_lock_dir = ENV['FLUENTD_LOCK_DIR'] @under_plugin_development = false end @@ -73,11 +73,11 @@ def multi_workers_ready? def get_lock_path(name) name = name.gsub(/[^a-zA-Z0-9]/, "_") - File.join(@_fluentd_lock_dir, "fluentd-#{name}.lock") + File.join(@fluentd_lock_dir, "fluentd-#{name}.lock") end def acquire_worker_lock(name) - if @_fluentd_lock_dir.nil? + if @fluentd_lock_dir.nil? raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set" end File.open(get_lock_path(name), "w") do |f| From 2c9f3f366fe8ebd95031545d767576f59ad88404 Mon Sep 17 00:00:00 2001 From: Fujimoto Seiji Date: Fri, 1 Jul 2022 15:56:53 +0900 Subject: [PATCH 10/10] Add more tests for inter-worker locking Signed-off-by: Fujimoto Seiji --- test/plugin/test_base.rb | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_base.rb b/test/plugin/test_base.rb index 32f547a467..b9567c9228 100644 --- a/test/plugin/test_base.rb +++ b/test/plugin/test_base.rb @@ -114,7 +114,18 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase assert{ logger.logs.first.include?("invalid byte sequence is replaced in ") } end - test 'acquire worker lock' do + test 'generates worker lock path safely' do + Dir.mktmpdir("test-fluentd-lock-") do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + p = FluentPluginBaseTest::DummyPlugin.new + path = p.get_lock_path("Aa\\|=~/_123"); + + assert_equal lock_dir, File.dirname(path) + assert_equal "fluentd-Aa______123.lock", File.basename(path) + end + end + + test 'can acquire inter-worker locking' do Dir.mktmpdir("test-fluentd-lock-") do |lock_dir| ENV['FLUENTD_LOCK_DIR'] = lock_dir p = FluentPluginBaseTest::DummyPlugin.new