Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent append in out_file #3808

Closed
wants to merge 10 commits into from
3 changes: 3 additions & 0 deletions lib/fluent/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def to_s
class InvalidRootDirectory < UnrecoverableError
end

class InvalidLockDirectory < UnrecoverableError
end

# For internal use
class UncatchableError < Exception
end
Expand Down
17 changes: 17 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ def fluentd_worker_id
@_fluentd_worker_id
end

def fluentd_lockdir
@_fluentd_lockdir ||= ENV['FLUENTD_LOCKDIR']
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
end

def configure(conf)
if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?)
workers = if conf.target_worker_ids && !conf.target_worker_ids.empty?
Expand All @@ -70,6 +74,19 @@ def multi_workers_ready?
true
end

def acquire_worker_lock(name, &block)
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
if fluentd_lockdir.nil?
raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set"
end

name = name.gsub(/[^a-zA-Z0-9]/, "_")
lockfile = "fluentd-#{name}.lock"
File.open(File.join(fluentd_lockdir, lockfile), "w") do |f|
f.flock(File::LOCK_EX)
yield
end
end

def string_safe_encoding(str)
unless str.valid_encoding?
str = str.scrub('?')
Expand Down
12 changes: 11 additions & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ def configure(conf)
condition = Gem::Dependency.new('', [">= 2.7.0", "< 3.1.0"])
@need_ruby_on_macos_workaround = true if condition.match?('', RUBY_VERSION)
end

if @need_lock && @append && fluentd_lockdir.nil?
raise InvalidLockDirectory, "must set FLUENTD_LOCKDIR on multi-worker append mode"
end
end

def multi_workers_ready?
Expand Down Expand Up @@ -217,7 +221,13 @@ def write(chunk)
end

if @append
writer.call(path, chunk)
if @need_lock
acquire_worker_lock(path) do
writer.call(path, chunk)
end
else
writer.call(path, chunk)
end
else
find_filepath_available(path, with_lock: @need_lock) do |actual_path|
writer.call(actual_path, chunk)
Expand Down
6 changes: 5 additions & 1 deletion lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,11 @@ def supervise
se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
}
se.run

Dir.mktmpdir("fluentd-lock-") do |lockdir|
ENV['FLUENTD_LOCKDIR'] = lockdir
se.run
end
end

def install_main_process_signal_handlers
Expand Down