Skip to content

Commit

Permalink
in_tcp: Add message_length_limit
Browse files Browse the repository at this point in the history
Add the same feature to `in_tcp` as `message_length_limit` in
`in_udp`, to drop too large incoming data.

If there is subsequent data, that part may fail to parse and
warning log is output, but this fix considers it a specification.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Apr 10, 2023
1 parent 24c8bdd commit 283a3b3
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 0 deletions.
30 changes: 30 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class TcpInput < Input
desc "The field name of the client's address."
config_param :source_address_key, :string, default: nil

# Setting default to nil for backward compatibility
desc "The max bytes of message."
config_param :message_length_limit, :size, default: nil

config_param :blocking_timeout, :time, default: 0.5

desc 'The payload is read up to this character.'
Expand Down Expand Up @@ -116,6 +120,11 @@ def start
msg = buf[pos...i]
pos = i + del_size

if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
next
end

@parser.parse(msg) do |time, record|
unless time && record
log.warn "pattern not matched", message: msg
Expand All @@ -131,6 +140,14 @@ def start
end
end
buf.slice!(0, pos) if pos > 0
# If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
# So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
# The subsequent data will be incomplete and cannot be parsed, but we consider it a specification.
if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
buf.clear
next
end
end
else
server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
Expand All @@ -147,6 +164,11 @@ def start
msg = buf[pos...i]
pos = i + del_size

if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
next
end

@parser.parse(msg) do |time, record|
unless time && record
log.warn "pattern not matched", message: msg
Expand All @@ -161,6 +183,14 @@ def start
end
router.emit_stream(@tag, es)
buf.slice!(0, pos) if pos > 0
# If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
# So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
# The subsequent data will be incomplete and cannot be parsed, but we consider it a specification.
if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
buf.clear
next
end
end
end
end
Expand Down
74 changes: 74 additions & 0 deletions test/plugin/test_in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,78 @@ def create_tcp_socket(host, port, &block)
assert_equal 'hello', event[2]['msg']
end
end

sub_test_case "message_length_limit" do
data("batch_emit", { extract: "" }, keep: true)
data("single_emit", { extract: "<extract>\ntag_key tag\n</extract>\n" }, keep: true)
test "drop records exceeding limit" do |data|
message_length_limit = 10
d = create_driver(base_config + %!
message_length_limit #{message_length_limit}
<parse>
@type none
</parse>
#{data[:extract]}
!)
d.run(expect_records: 2, timeout: 10) do
create_tcp_socket('127.0.0.1', @port) do |sock|
sock.send("a" * message_length_limit + "\n", 0)
sock.send("b" * (message_length_limit + 1) + "\n", 0)
sock.send("c" * (message_length_limit - 1) + "\n", 0)
end
end

expected_records = [
"a" * message_length_limit,
"c" * (message_length_limit - 1)
]
actual_records = d.events.collect do |event|
event[2]["message"]
end

assert_equal expected_records, actual_records
end

test "clear buffer, which results in making an incomplete record" do |data|
message_length_limit = 12
d = create_driver(base_config + %!
message_length_limit #{message_length_limit}
delimiter ";"
<parse>
@type json
</parse>
#{data[:extract]}
!)
d.run(expect_records: 1, timeout: 10) do
create_tcp_socket('127.0.0.1', @port) do |sock|
sock.send('{"message":', 0)
sock.send('"hello', 0)
sleep 1 # To make the server read data and clear the buffer here.
sock.send('world!"};', 0) # this will be incomplete and fail to parse
sock.send('{"k":"v"};', 0) # this will succeed to parse
end
end

logs = d.logs.collect do |log|
log.gsub(/\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [-+]\d{4} /, "")
end
actual_records = d.events.collect do |event|
event[2]
end

assert_equal(
{
logs: [
'[info]: The buffer size exceeds \'message_length_limit\', cleared: limit=12 size=17 head="{\"message\":\"hello"' + "\n",
'[warn]: pattern not matched message="world!\"}"' + "\n"
],
records: [{"k" => "v"}],
},
{
logs: logs[1..],
records: actual_records,
}
)
end
end
end

0 comments on commit 283a3b3

Please sign in to comment.