Skip to content

Commit

Permalink
Merge pull request #1413 from fluent/ensure-to-produce-tags-in-str
Browse files Browse the repository at this point in the history
Ensure to write str(not bin) objects, for compatibility reason
  • Loading branch information
tagomoris authored Jan 11, 2017
2 parents 19ecdd0 + 7b7622a commit 63531b7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
20 changes: 13 additions & 7 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,22 @@ def send_data_actual(sock, tag, chunk)
option = { 'size' => chunk.size, 'compressed' => @compress }
option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response

# out_forward always uses Raw32 type for content.
# Raw16 can store only 64kbytes, and it should be much smaller than buffer chunk size.
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode
# out_forward always uses str32 type for entries.
# str16 can store only 64kbytes, and it should be much smaller than buffer chunk size.

sock.write @sender.forward_header # beginArray(3)
sock.write tag.to_msgpack # 1. writeRaw(tag)
tag = tag.dup.force_encoding(Encoding::UTF_8)

sock.write @sender.forward_header # array, size=3
sock.write tag.to_msgpack # 1. tag: String (str)
chunk.open(compressed: @compress) do |chunk_io|
sock.write [0xdb, chunk_io.size].pack('CN') # 2. beginRaw(size) raw32
IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
entries = [0xdb, chunk_io.size].pack('CN')
sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
end
sock.write option.to_msgpack # 3. writeOption(option)
sock.write option.to_msgpack # 3. option: Hash(map)

# TODO: use bin32 for non-utf8 content(entries) when old msgpack-ruby (0.5.x or earlier) not supported
end

def send_data(tag, chunk)
Expand Down
34 changes: 34 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,40 @@ def read_ack_from_sock(sock, unpacker)
assert_equal 2, d.instance.ack_response_timeout
end

test 'send tags in str (utf-8 strings)' do
target_input_driver = create_target_input_driver

@d = d = create_driver(CONFIG + %[flush_interval 1s])

time = event_time("2011-01-02 13:14:15 UTC")

tag_in_utf8 = "test.utf8".encode("utf-8")
tag_in_ascii = "test.ascii".encode("ascii-8bit")

emit_events = [
[tag_in_utf8, time, {"a" => 1}],
[tag_in_ascii, time, {"a" => 2}],
]

target_input_driver.run(expect_records: 2) do
d.run do
emit_events.each do |tag, time, record|
d.feed(tag, time, record)
end
end
end

events = target_input_driver.events
assert_equal_event_time(time, events[0][1])
assert_equal ['test.utf8', time, emit_events[0][2]], events[0]
assert_equal Encoding::UTF_8, events[0][0].encoding
assert_equal_event_time(time, events[1][1])
assert_equal ['test.ascii', time, emit_events[1][2]], events[1]
assert_equal Encoding::UTF_8, events[1][0].encoding

assert_empty d.instance.exceptions
end

test 'send_with_time_as_integer' do
target_input_driver = create_target_input_driver

Expand Down

0 comments on commit 63531b7

Please sign in to comment.