diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb
index f7728ab..b62025c 100644
--- a/lib/fluent/plugin/in_exec.rb
+++ b/lib/fluent/plugin/in_exec.rb
@@ -1,24 +1,122 @@
#
-# Copyright 2024- Daijiro Fukuda
+# Fluentd
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "fluent/plugin/input"
-
-module Fluent
- module Plugin
- class ExecInput < Fluent::Plugin::Input
- Fluent::Plugin.register_input("exec", self)
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'fluent/plugin/input'
+require 'yajl'
+
+module Fluent::Plugin
+ class ExecInput < Fluent::Plugin::Input
+ Fluent::Plugin.register_input('exec', self)
+
+ helpers :compat_parameters, :extract, :parser, :child_process
+
+ desc 'The command (program) to execute.'
+ config_param :command, :string
+ desc 'Specify connect mode to executed process'
+ config_param :connect_mode, :enum, list: [:read, :read_with_stderr], default: :read
+
+ config_section :parse do
+ config_set_default :@type, 'tsv'
+ config_set_default :time_type, :float
+ config_set_default :time_key, nil
+ config_set_default :estimate_current_event, false
+ end
+
+ config_section :extract do
+ config_set_default :time_type, :float
+ end
+
+ desc 'Tag of the output events.'
+ config_param :tag, :string, default: nil
+ desc 'The interval time between periodic program runs.'
+ config_param :run_interval, :time, default: nil
+ desc 'The default block size to read if parser requires partial read.'
+ config_param :read_block_size, :size, default: 10240 # 10k
+ desc 'The encoding to receive the result of the command, especially for non-ascii characters.'
+ config_param :encoding, :string, default: nil
+
+ attr_reader :parser
+
+ def configure(conf)
+ compat_parameters_convert(conf, :extract, :parser)
+ ['parse', 'extract'].each do |subsection_name|
+ if subsection = conf.elements(subsection_name).first
+ if subsection.has_key?('time_format')
+ subsection['time_type'] ||= 'string'
+ end
+ end
+ end
+
+ super
+
+ if !@tag && (!@extract_config || !@extract_config.tag_key)
+ raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input"
+ end
+ validate_encoding(@encoding) if @encoding
+ @parser = parser_create
+ end
+
+ def validate_encoding(encoding)
+ Encoding.find(encoding)
+ rescue ArgumentError => e
+ raise Fluent::ConfigError, e.message
+ end
+
+ def multi_workers_ready?
+ true
+ end
+
+ def start
+ super
+
+ options = { mode: [@connect_mode] }
+ options[:external_encoding] = @encoding if @encoding
+
+ if @run_interval
+ child_process_execute(:exec_input, @command, interval: @run_interval, **options, &method(:run))
+ else
+ child_process_execute(:exec_input, @command, immediate: true, **options, &method(:run))
+ end
+ end
+
+ def run(io)
+ case
+ when @parser.implement?(:parse_io)
+ @parser.parse_io(io, &method(:on_record))
+ when @parser.implement?(:parse_partial_data)
+ until io.eof?
+ @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
+ end
+ when @parser.parser_type == :text_per_line
+ io.each_line do |line|
+ @parser.parse(line.chomp, &method(:on_record))
+ end
+ else
+ @parser.parse(io.read, &method(:on_record))
+ end
+ end
+
+ def on_record(time, record)
+ tag = extract_tag_from_record(record)
+ tag ||= @tag
+ time ||= extract_time_from_record(record) || Fluent::EventTime.now
+ router.emit(tag, time, record)
+ rescue => e
+ log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e
+ router.emit_error_event(tag, time, record, e) if tag && time && record
end
end
end
diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb
index 33638af..9cd378f 100644
--- a/test/plugin/test_in_exec.rb
+++ b/test/plugin/test_in_exec.rb
@@ -6,8 +6,39 @@ class ExecInputTest < Test::Unit::TestCase
Fluent::Test.setup
end
- test "failure" do
- flunk
+ data(immediate: "")
+ data(run_interval: "run_interval 1")
+ test 'can handle non-ascii characters' do |additional_setting|
+ content = 'ひらがな漢字'
+
+ d = create_driver %[
+ command ruby -e "puts '#{content}'"
+ tag test
+ encoding utf-8
+
+ @type none
+
+ #{additional_setting}
+ ]
+
+ d.run(expect_records: 1, timeout: 10)
+
+ assert_equal 1, d.events.length
+ tag, time, record = d.events.first
+ assert_equal({"message" => content}, record)
+ end
+
+ test 'raise ConfigError for invalid encoding' do
+ assert_raise Fluent::ConfigError do
+ d = create_driver %[
+ command ruby -e "puts foo"
+ tag test
+ encoding invalid-encode
+
+ @type none
+
+ ]
+ end
end
private