Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Root directory per worker process #1374

Merged
merged 15 commits into from
Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
# limitations under the License.
#

require 'socket'

require 'cool.io'

require 'fluent/config'
require 'fluent/event'
require 'fluent/event_router'
Expand Down Expand Up @@ -58,8 +54,6 @@ def initialize
def init(system_config)
@system_config = system_config

BasicSocket.do_not_reverse_lookup = true

suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
Expand Down Expand Up @@ -172,7 +166,8 @@ def log_event_loop

def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid # TODO: worker number
worker_id = ENV['SERVERENGINE_WORKER_ID']
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start

if @event_router.match?($log.tag)
Expand Down
5 changes: 0 additions & 5 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,6 @@ def configure(conf)
end
end

def start
@log.reset
super
end

def terminate
super
@log.reset
Expand Down
11 changes: 11 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,24 @@ def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false, false)
@_context_router = nil
@_fluentd_worker_id = nil
@under_plugin_development = false
end

def has_router?
false
end

def plugin_root_dir
nil # override this in plugin_id.rb
end

def fluentd_worker_id
return @_fluentd_worker_id if @_fluentd_worker_id
@_fluentd_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
@_fluentd_worker_id
end

def configure(conf)
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
Expand Down
16 changes: 9 additions & 7 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,15 @@ class FileBuffer < Fluent::Plugin::Buffer

DIR_PERMISSION = 0755

# TODO: buffer_path based on system config
desc 'The path where buffer chunks are stored.'
config_param :path, :string
config_param :path, :string, default: nil

config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE
config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE

config_param :file_permission, :string, default: nil # '0644'
config_param :dir_permission, :string, default: nil # '0755'

##TODO: Buffer plugin cannot handle symlinks because new API @stage has many writing buffer chunks
## re-implement this feature on out_file, w/ enqueue_chunk(or generate_chunk) hook + chunk.path
# attr_accessor :symlink_path

@@buffer_paths = {}

def initialize
Expand All @@ -56,6 +51,14 @@ def initialize
def configure(conf)
super

unless @path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, should we validate @path value for avoding conflict trouble?
Fixed path should not allowed with multiprocess configuration or
support %{worker_id}/%{plugin_id} placeholder?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer chunks have unique_id, which are generated with time and random value.
So it SHOULD NOT conflict with other chunks in other processes. No need to do such thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, there are some problem about resuming at startup...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, plugin_id can't help that situation. plugin_id is (should be) unique in a worker process, not server global.

if root_dir = owner.plugin_root_dir
@path = File.join(root_dir, 'buffer')
else
raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>"
end
end

type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
if @@buffer_paths.has_key?(@path) && !buffer_path_for_test?
type_using_this_path = @@buffer_paths[@path]
Expand All @@ -64,7 +67,6 @@ def configure(conf)

@@buffer_paths[@path] = type_of_owner

# TODO: create buffer path with plugin_id, under directory specified by system config
if File.exist?(@path)
if File.directory?(@path)
@path = File.join(@path, 'buffer.*.log')
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def configure(conf)
raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input"
end

# TODO: Use plugin_root_dir and storage plugin to store positions if available
unless @pos_file
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
Expand Down
29 changes: 16 additions & 13 deletions lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ class LocalStorage < Storage
DEFAULT_FILE_MODE = 0644

config_param :path, :string, default: nil
config_param :mode, :integer, default: DEFAULT_FILE_MODE
config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE
config_param :mode, default: DEFAULT_FILE_MODE do |v|
v.to_i(8)
end
config_param :dir_mode, default: DEFAULT_DIR_MODE do |v|
v.to_i(8)
end
config_param :pretty_print, :bool, default: false

attr_reader :store # for test

def initialize
super
@store = {}
Expand All @@ -42,9 +48,13 @@ def configure(conf)
super

@on_memory = false
if !@path && !@_plugin_id_configured
if @path
# use it
elsif root_dir = owner.plugin_root_dir
@path = File.join(root_dir, 'storage.json')
else
if @persistent
raise Fluent::ConfigError, "Plugin @id or path for <storage> required to save data"
raise Fluent::ConfigError, "Plugin @id or path for <storage> required when 'persistent' is true"
else
if @autosave
log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
Expand All @@ -53,18 +63,11 @@ def configure(conf)
end
@on_memory = true
end
elsif @path
# ok
else # @_plugin_id_configured is true
log.warn "path for <storage> is not specified. Using on-memory store temporarily, but will use file store after support global storage path"
@on_memory = true
## TODO: get process-wide directory for plugin storage, and generate path for this plugin storage instance
# path =
end

if !@on_memory
dir = File.dirname(@path)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless File.exist?(dir)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
if File.exist?(@path)
raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
begin
Expand All @@ -75,7 +78,7 @@ def configure(conf)
raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
end
else
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{dir}'" unless File.writable?(dir)
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
end
end
end
Expand Down
30 changes: 21 additions & 9 deletions lib/fluent/plugin_helper/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ module Storage
StorageState = Struct.new(:storage, :running)

def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
if conf && !conf.arg.empty?
usage = conf.arg
end

s = @_storages[usage]
if s && s.running
return s.storage
Expand Down Expand Up @@ -72,7 +76,7 @@ def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
module StorageParams
include Fluent::Configurable
# minimum section definition to instantiate storage plugin instances
config_section :storage, required: false, multi: true, param_name: :storage_configs do
config_section :storage, required: false, multi: true, param_name: :storage_configs, init: true do
config_argument :usage, :string, default: ''
config_param :@type, :string, default: Fluent::Plugin::Storage::DEFAULT_TYPE
end
Expand Down Expand Up @@ -194,6 +198,10 @@ def initialize(storage)
def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate
def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated?

def method_missing(name, *args)
@monitor.synchronize{ @storage.__send__(name, *args) }
end

def persistent_always?
true
end
Expand Down Expand Up @@ -274,14 +282,18 @@ class SynchronizeWrapper

def initialize(storage)
@storage = storage
@mutex = Mutex.new
@monitor = Monitor.new
end

def_delegators :@storage, :persistent, :autosave, :autosave_interval, :save_at_shutdown
def_delegators :@storage, :persistent_always?
def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate
def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated?

def method_missing(name, *args)
@monitor.synchronize{ @storage.__send__(name, *args) }
end

def synchronized?
true
end
Expand All @@ -291,35 +303,35 @@ def implementation
end

def load
@mutex.synchronize do
@monitor.synchronize do
@storage.load
end
end

def save
@mutex.synchronize do
@monitor.synchronize do
@storage.save
end
end

def get(key)
@mutex.synchronize{ @storage.get(key) }
@monitor.synchronize{ @storage.get(key) }
end

def fetch(key, defval)
@mutex.synchronize{ @storage.fetch(key, defval) }
@monitor.synchronize{ @storage.fetch(key, defval) }
end

def put(key, value)
@mutex.synchronize{ @storage.put(key, value) }
@monitor.synchronize{ @storage.put(key, value) }
end

def delete(key)
@mutex.synchronize{ @storage.delete(key) }
@monitor.synchronize{ @storage.delete(key) }
end

def update(key, &block)
@mutex.synchronize do
@monitor.synchronize do
v = block.call(@storage.get(key))
@storage.put(key, v)
v
Expand Down
17 changes: 17 additions & 0 deletions lib/fluent/plugin_id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ module Fluent
module PluginId
@@configured_ids = Set.new

def initialize
super
@_plugin_root_dir = nil
end

def configure(conf)
@id = conf['@id']
@_id_configured = !!@id # plugin id is explicitly configured by users (or not)
Expand Down Expand Up @@ -59,5 +64,17 @@ def plugin_id
"object:#{object_id.to_s(16)}"
end
end

def plugin_root_dir
return @_plugin_root_dir if @_plugin_root_dir
return nil unless system_config.root_dir
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put log.warn is better?
Maybe, users hard to find the cause why buffer_path/storage is not configured automatically.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not about users.
This code is called by plugins, and plugin should check whether plugin_root_dir is available or not by return value, and raise errors with messages which shows how to configure that plugin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason of this implementation is, plugin MAY use any default values (paths or any others) if root directory is unavailable.

return nil unless plugin_id_configured?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


# Fluent::Plugin::Base#fluentd_worker_id
dir = File.join(system_config.root_dir, "worker#{fluentd_worker_id}", plugin_id)
FileUtils.mkdir_p(dir) unless Dir.exist?(dir)
@_plugin_root_dir = dir.freeze
dir
end
end
end
27 changes: 25 additions & 2 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'etc'
require 'fcntl'
require 'fileutils'

require 'fluent/config'
require 'fluent/env'
Expand Down Expand Up @@ -192,7 +193,10 @@ def supervisor_get_dump_config_handler
module WorkerModule
def spawn(process_manager)
main_cmd = config[:main_cmd]
@pm = process_manager.spawn(*main_cmd)
env = {
'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s,
}
@pm = process_manager.spawn(env, *main_cmd)
end

def after_start
Expand Down Expand Up @@ -226,6 +230,7 @@ def self.load_config(path, params = {})
fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
system_config = SystemConfig.create(fluentd_conf)

root_dir = system_config.root_dir || params['root_dir']
log_level = system_config.log_level || params['log_level']
suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace']
log_path = params['log_path']
Expand Down Expand Up @@ -264,6 +269,7 @@ def self.load_config(path, params = {})
auto_heartbeat: false,
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: root_dir,
logger: logger,
log: logger.out,
log_path: log_path,
Expand Down Expand Up @@ -365,6 +371,7 @@ def self.default_options
setup_path: nil,
chuser: nil,
chgroup: nil,
root_dir: nil,
suppress_interval: 0,
suppress_repeated_stacktrace: true,
without_source: false,
Expand Down Expand Up @@ -393,6 +400,7 @@ def initialize(opt)
@rpc_server = nil
@process_name = nil

@root_dir = opt[:root_dir]
@log_level = opt[:log_level]
@log_rotate_age = opt[:log_rotate_age]
@log_rotate_size = opt[:log_rotate_size]
Expand All @@ -417,6 +425,20 @@ def run_supervisor
read_config
set_system_config

if @root_dir
if File.exist?(@root_dir)
unless Dir.exist?(@root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}"
end
else
begin
FileUtils.mkdir_p(@root_dir)
rescue => e
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}"
end
end
end

dry_run if @dry_run
supervise
end
Expand All @@ -426,7 +448,8 @@ def options
'config_path' => @config_path,
'pid_file' => @daemonize,
'plugin_dirs' => @plugin_dirs,
'log_path' => @log_path
'log_path' => @log_path,
'root_dir' => @root_dir,
}
end

Expand Down
Loading