Skip to content

Commit

Permalink
fluent-cat: support secondary file
Browse files Browse the repository at this point in the history
https://docs.fluentd.org/output/secondary_file#how-to-resend-secondary-file
explains that it supports secondary file to resend,
but it get error when [Fluent::EventTime, Hash] format is given.

  Input must be a map (got Array)

In this commit, check record content and support [Fluent::EventTime,
Hash] format too.

Note that fluent-cat ignore timestamp which is stored in secondary
records. (it is intended behavior for keeping consistency)

Signed-off-by: Kentaro Hayashi <[email protected]>
  • Loading branch information
kenhys committed May 14, 2021
1 parent 5bd3346 commit 4ddd50c
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 3 deletions.
22 changes: 19 additions & 3 deletions lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,30 @@ def initialize(tag, connector, time_as_integer: false, retry_limit: 5)
super()
end

def secondary_record?(record)
record.class != Hash &&
record.size == 2 &&
record.first.class == Fluent::EventTime &&
record.last.class == Hash
end

def write(record)
if record.class != Hash
raise ArgumentError, "Input must be a map (got #{record.class})"
unless secondary_record?(record)
if record.class != Hash
raise ArgumentError, "Input must be a map (got #{record.class})"
end
end

time = Fluent::EventTime.now
time = time.to_i if @time_as_integer
entry = [time, record]
entry = if secondary_record?(record)
# Even though secondary contains Fluent::EventTime in record,
# fluent-cat just ignore it and set Fluent::EventTime.now instead.
# This specification is adopted to keep consistency.
[time, record.last]
else
[time, record]
end
synchronize {
unless write_impl([entry])
# write failed
Expand Down
96 changes: 96 additions & 0 deletions test/command/test_cat.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
require_relative '../helper'

require 'test-unit'
require 'open3'
require 'fluent/plugin/output'
require 'fluent/plugin/in_forward'
require 'fluent/plugin/out_secondary_file'
require 'fluent/test/driver/output'
require 'fluent/test/driver/input'

class TestFluentCat < ::Test::Unit::TestCase
def setup
Fluent::Test.setup
FileUtils.mkdir_p(TMP_DIR)
@record = { 'key' => 'value' }
@time = event_time
@es = Fluent::OneEventStream.new(@time, @record)
@primary = create_primary
metadata = @primary.buffer.new_metadata
@chunk = create_chunk(@primary, metadata, @es)
end

def teardown
FileUtils.rm_rf(TMP_DIR)
end

TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluent_cat#{ENV['TEST_ENV_NUMBER']}")
FLUENT_CAT_COMMAND = File.expand_path(File.dirname(__FILE__) + "/../../bin/fluent-cat")

PORT = unused_port
CONFIG = %[
port #{PORT}
bind 127.0.0.1
]

SECONDARY_CONFIG = %[
directory #{TMP_DIR}
]

class DummyOutput < Fluent::Plugin::Output
def write(chunk); end
end

def create_driver(conf=CONFIG)
Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput).configure(conf)
end

def create_primary(buffer_cofig = config_element('buffer'))
DummyOutput.new.configure(config_element('ROOT','',{}, [buffer_cofig]))
end

def create_secondary_driver(conf=SECONDARY_CONFIG)
c = Fluent::Test::Driver::Output.new(Fluent::Plugin::SecondaryFileOutput)
c.instance.acts_as_secondary(@primary)
c.configure(conf)
end

def create_chunk(primary, metadata, es)
primary.buffer.generate_chunk(metadata).tap do |c|
c.concat(es.to_msgpack_stream, es.size)
c.commit
end
end

sub_test_case "json" do
def test_cat_json
d = create_driver
d.run(expect_records: 1) do
Open3.pipeline_w("#{FLUENT_CAT_COMMAND} --port #{PORT} json") do |stdin|
stdin.puts('{"key":"value"}')
stdin.close
end
end
event = d.events.first
assert_equal([1, "json", @record],
[d.events.size, event.first, event.last])
end
end

sub_test_case "msgpack" do
def test_cat_secondary_file
d = create_secondary_driver
path = d.instance.write(@chunk)
d = create_driver
d.run(expect_records: 1) do
Open3.pipeline_w("#{FLUENT_CAT_COMMAND} --port #{PORT} --format msgpack secondary") do |stdin|
stdin.write(File.read(path))
stdin.close
end
end
event = d.events.first
assert_equal([1, "secondary", @record],
[d.events.size, event.first, event.last])
end
end
end

0 comments on commit 4ddd50c

Please sign in to comment.