Skip to content

Commit

Permalink
Add with-source-only feature
Browse files Browse the repository at this point in the history
* Mainly intended to be used by command option `--with-source-only`
* We can use system-config option `with_source_only` as well
* Not supported on Windows

It launches Fluentd with Input plugins only.
Here is the specification.

* Those Input plugins emits the events to SourceOnlyBufferAgent.
* The events is kept in the buf_file during the source-only-mode.
* Send SIGWINCH to the supervisor to cancels the mode.
* After canceled, the new agent starts to load the buffer.

Co-authored-by: Shizuo Fujita <[email protected]>
Co-authored-by: Kentaro Hayashi <[email protected]>
Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
3 people committed Nov 21, 2024
1 parent eebc7e1 commit 2a857dc
Show file tree
Hide file tree
Showing 18 changed files with 935 additions and 52 deletions.
6 changes: 6 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
cmd_opts[:without_source] = b
}

unless Fluent.windows?
op.on('--with-source-only', "Invoke a fluentd only with input plugins. The data is stored in a temporary buffer. Send SIGWINCH to cancel this mode and process the data (Not supported on Windows).", TrueClass) {|b|
cmd_opts[:with_source_only] = b
}
end

op.on('--config-file-type VALU', 'guessing file type of fluentd configuration. yaml/yml or guess') { |s|
if (s == 'yaml') || (s == 'yml')
cmd_opts[:config_file_type] = s.to_sym
Expand Down
78 changes: 47 additions & 31 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def initialize
@system_config = SystemConfig.new

@supervisor_mode = false

@root_agent_mutex = Mutex.new
end

MAINLOOP_SLEEP_INTERVAL = 0.3
Expand Down Expand Up @@ -133,7 +135,15 @@ def emit_stream(tag, es)
end

def flush!
@root_agent.flush!
@root_agent_mutex.synchronize do
@root_agent.flush!
end
end

def cancel_source_only!
@root_agent_mutex.synchronize do
@root_agent.cancel_source_only!
end
end

def now
Expand All @@ -144,7 +154,9 @@ def now
def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start
@root_agent_mutex.synchronize do
start
end

@fluent_log_event_router.start

Expand All @@ -158,47 +170,51 @@ def run
raise
end

stop_phase(@root_agent)
@root_agent_mutex.synchronize do
stop_phase(@root_agent)
end
end

# @param conf [Fluent::Config]
# @param supervisor [Bool]
# @reutrn nil
def reload_config(conf, supervisor: false)
# configure first to reduce down time while restarting
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
@root_agent_mutex.synchronize do
# configure first to reduce down time while restarting
new_agent = RootAgent.new(log: log, system_config: @system_config)
ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers)

ret.all_plugins.each do |plugin|
if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin?
raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})"
end
end
end

# Assign @root_agent to new root_agent
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end
# Assign @root_agent to new root_agent
# for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
old_agent, @root_agent = @root_agent, new_agent
begin
@root_agent.configure(conf)
rescue
@root_agent = old_agent
raise
end

unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end

# supervisor doesn't handle actual data. so the following code is unnecessary.
if supervisor
old_agent.shutdown # to close thread created in #configure
return
end
# supervisor doesn't handle actual data. so the following code is unnecessary.
if supervisor
old_agent.shutdown # to close thread created in #configure
return
end

stop_phase(old_agent)
stop_phase(old_agent)

$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
$log.info 'restart fluentd worker', worker: worker_id
start_phase(new_agent)
end
end

def stop
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#

require 'securerandom'

require 'serverengine/utils'
require 'fluent/oj_options'

Expand All @@ -25,6 +27,7 @@ module Fluent
DEFAULT_OJ_OPTIONS = Fluent::OjOptions.load_env
DEFAULT_DIR_PERMISSION = 0755
DEFAULT_FILE_PERMISSION = 0644
INSTANCE_ID = ENV['FLUENT_INSTANCE_ID'] || SecureRandom.uuid

def self.windows?
ServerEngine.windows?
Expand Down
40 changes: 40 additions & 0 deletions lib/fluent/plugin/out_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferOutput < Output
Fluent::Plugin.register_output("buffer", self)
helpers :event_emitter

config_section :buffer do
config_set_default :@type, "file"
config_set_default :chunk_keys, ["tag"]
config_set_default :flush_mode, :interval
config_set_default :flush_interval, 10
end

def multi_workers_ready?
true
end

def write(chunk)
return if chunk.empty?
router.emit_stream(chunk.metadata.tag, Fluent::MessagePackEventStream.new(chunk.read))
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,7 @@ def retry_state(randomize)
end

def submit_flush_once
return unless @buffer_config.flush_thread_count > 0
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
Expand All @@ -1406,6 +1407,7 @@ def force_flush
end

def submit_flush_all
return unless @buffer_config.flush_thread_count > 0
while !@retry && @buffer.queued?
submit_flush_once
sleep @buffer_config.flush_thread_burst_interval
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module EventEmitter

def router
@_event_emitter_used_actually = true

return Engine.root_agent.source_only_router if @_event_emitter_force_source_only_router

if @_event_emitter_lazy_init
@router = @primary_instance.router
end
Expand All @@ -48,6 +51,14 @@ def event_emitter_used_actually?
@_event_emitter_used_actually
end

def event_emitter_apply_source_only
@_event_emitter_force_source_only_router = true
end

def event_emitter_cancel_source_only
@_event_emitter_force_source_only_router = false
end

def event_emitter_router(label_name)
if label_name
if label_name == "@ROOT"
Expand All @@ -72,6 +83,7 @@ def initialize
super
@_event_emitter_used_actually = false
@_event_emitter_lazy_init = false
@_event_emitter_force_source_only_router = false
@router = nil
end

Expand Down
Loading

0 comments on commit 2a857dc

Please sign in to comment.