Skip to content

Commit

Permalink
copy code from Fleuntd#4533
Browse files Browse the repository at this point in the history
From fluent/fluentd#4533

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Jun 20, 2024
1 parent 36db352 commit 6ff20a7
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 19 deletions.
132 changes: 115 additions & 17 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,122 @@
#
# Copyright 2024- Daijiro Fukuda
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "fluent/plugin/input"

module Fluent
module Plugin
class ExecInput < Fluent::Plugin::Input
Fluent::Plugin.register_input("exec", self)
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/input'
require 'yajl'

module Fluent::Plugin
class ExecInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('exec', self)

helpers :compat_parameters, :extract, :parser, :child_process

desc 'The command (program) to execute.'
config_param :command, :string
desc 'Specify connect mode to executed process'
config_param :connect_mode, :enum, list: [:read, :read_with_stderr], default: :read

config_section :parse do
config_set_default :@type, 'tsv'
config_set_default :time_type, :float
config_set_default :time_key, nil
config_set_default :estimate_current_event, false
end

config_section :extract do
config_set_default :time_type, :float
end

desc 'Tag of the output events.'
config_param :tag, :string, default: nil
desc 'The interval time between periodic program runs.'
config_param :run_interval, :time, default: nil
desc 'The default block size to read if parser requires partial read.'
config_param :read_block_size, :size, default: 10240 # 10k
desc 'The encoding to receive the result of the command, especially for none-ascii characters.'
config_param :encoding, :string, default: nil

attr_reader :parser

def configure(conf)
compat_parameters_convert(conf, :extract, :parser)
['parse', 'extract'].each do |subsection_name|
if subsection = conf.elements(subsection_name).first
if subsection.has_key?('time_format')
subsection['time_type'] ||= 'string'
end
end
end

super

if !@tag && (!@extract_config || !@extract_config.tag_key)
raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input"
end
validate_encoding(@encoding) if @encoding
@parser = parser_create
end

def validate_encoding(encoding)
Encoding.find(encoding)
rescue ArgumentError => e
raise Fluent::ConfigError, e.message
end

def multi_workers_ready?
true
end

def start
super

options = { mode: [@connect_mode] }
options[:external_encoding] = @encoding if @encoding

if @run_interval
child_process_execute(:exec_input, @command, interval: @run_interval, **options, &method(:run))
else
child_process_execute(:exec_input, @command, immediate: true, **options, &method(:run))
end
end

def run(io)
case
when @parser.implement?(:parse_io)
@parser.parse_io(io, &method(:on_record))
when @parser.implement?(:parse_partial_data)
until io.eof?
@parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
end
when @parser.parser_type == :text_per_line
io.each_line do |line|
@parser.parse(line.chomp, &method(:on_record))
end
else
@parser.parse(io.read, &method(:on_record))
end
end

def on_record(time, record)
tag = extract_tag_from_record(record)
tag ||= @tag
time ||= extract_time_from_record(record) || Fluent::EventTime.now
router.emit(tag, time, record)
rescue => e
log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e
router.emit_error_event(tag, time, record, e) if tag && time && record
end
end
end
35 changes: 33 additions & 2 deletions test/plugin/test_in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,39 @@ class ExecInputTest < Test::Unit::TestCase
Fluent::Test.setup
end

test "failure" do
flunk
data(immediate: "")
data(run_interval: "run_interval 1")
test 'can handle none-ascii characters' do |additional_setting|
content = 'ひらがな漢字'

d = create_driver %[
command ruby -e "puts '#{content}'"
tag test
encoding utf-8
<parse>
@type none
</parse>
#{additional_setting}
]

d.run(expect_records: 1, timeout: 10)

assert_equal 1, d.events.length
tag, time, record = d.events.first
assert_equal({"message" => content}, record)
end

test 'raise ConfigError for invalid encoding' do
assert_raise Fluent::ConfigError do
d = create_driver %[
command ruby -e "puts foo"
tag test
encoding invalid-encode
<parse>
@type none
</parse>
]
end
end

private
Expand Down

0 comments on commit 6ff20a7

Please sign in to comment.