diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb
index e3b0a60c75..bd2ea83e5b 100644
--- a/lib/fluent/plugin/in_tcp.rb
+++ b/lib/fluent/plugin/in_tcp.rb
@@ -36,6 +36,10 @@ class TcpInput < Input
desc "The field name of the client's address."
config_param :source_address_key, :string, default: nil
+ # Setting default to nil for backward compatibility
+ desc "The max bytes of message."
+ config_param :message_length_limit, :size, default: nil
+
config_param :blocking_timeout, :time, default: 0.5
desc 'The payload is read up to this character.'
@@ -102,6 +106,7 @@ def start
log.info "listening tcp socket", bind: @bind, port: @port
del_size = @delimiter.length
+ discard_till_next_delimiter = false
if @_extract_enabled && @_extract_tag_key
server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
unless check_client(conn)
@@ -116,6 +121,16 @@ def start
msg = buf[pos...i]
pos = i + del_size
+ if discard_till_next_delimiter
+ discard_till_next_delimiter = false
+ next
+ end
+
+ if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
+ log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
+ next
+ end
+
@parser.parse(msg) do |time, record|
unless time && record
log.warn "pattern not matched", message: msg
@@ -131,6 +146,15 @@ def start
end
end
buf.slice!(0, pos) if pos > 0
+ # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
+ # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
+ if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
+ log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
+ buf.clear
+ # We should discard the subsequent data until the next delimiter comes.
+ discard_till_next_delimiter = true
+ next
+ end
end
else
server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key, send_keepalive_packet: @send_keepalive_packet) do |data, conn|
@@ -147,6 +171,16 @@ def start
msg = buf[pos...i]
pos = i + del_size
+ if discard_till_next_delimiter
+ discard_till_next_delimiter = false
+ next
+ end
+
+ if !@message_length_limit.nil? && @message_length_limit < msg.bytesize
+ log.info "The received data is larger than 'message_length_limit', dropped:", limit: @message_length_limit, size: msg.bytesize, head: msg[...32]
+ next
+ end
+
@parser.parse(msg) do |time, record|
unless time && record
log.warn "pattern not matched", message: msg
@@ -161,6 +195,15 @@ def start
end
router.emit_stream(@tag, es)
buf.slice!(0, pos) if pos > 0
+ # If the buffer size exceeds the limit here, it means that the next message will definitely exceed the limit.
+ # So we should clear the buffer here. Otherwise, it will keep storing useless data until the next delimiter comes.
+ if !@message_length_limit.nil? && @message_length_limit < buf.bytesize
+ log.info "The buffer size exceeds 'message_length_limit', cleared:", limit: @message_length_limit, size: buf.bytesize, head: buf[...32]
+ buf.clear
+ # We should discard the subsequent data until the next delimiter comes.
+ discard_till_next_delimiter = true
+ next
+ end
end
end
end
diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb
index 46061cb32c..32fb7cca96 100755
--- a/test/plugin/test_in_tcp.rb
+++ b/test/plugin/test_in_tcp.rb
@@ -255,4 +255,76 @@ def create_tcp_socket(host, port, &block)
assert_equal 'hello', event[2]['msg']
end
end
+
+ sub_test_case "message_length_limit" do
+ data("batch_emit", { extract: "" }, keep: true)
+ data("single_emit", { extract: "\ntag_key tag\n\n" }, keep: true)
+ test "drop records exceeding limit" do |data|
+ message_length_limit = 10
+ d = create_driver(base_config + %!
+ message_length_limit #{message_length_limit}
+
+ @type none
+
+ #{data[:extract]}
+ !)
+ d.run(expect_records: 2, timeout: 10) do
+ create_tcp_socket('127.0.0.1', @port) do |sock|
+ sock.send("a" * message_length_limit + "\n", 0)
+ sock.send("b" * (message_length_limit + 1) + "\n", 0)
+ sock.send("c" * (message_length_limit - 1) + "\n", 0)
+ end
+ end
+
+ expected_records = [
+ "a" * message_length_limit,
+ "c" * (message_length_limit - 1)
+ ]
+ actual_records = d.events.collect do |event|
+ event[2]["message"]
+ end
+
+ assert_equal expected_records, actual_records
+ end
+
+ test "clear buffer and discard the subsequent data until the next delimiter" do |data|
+ message_length_limit = 12
+ d = create_driver(base_config + %!
+ message_length_limit #{message_length_limit}
+ delimiter ";"
+
+ @type json
+
+ #{data[:extract]}
+ !)
+ d.run(expect_records: 1, timeout: 10) do
+ create_tcp_socket('127.0.0.1', @port) do |sock|
+ sock.send('{"message":', 0)
+ sock.send('"hello', 0)
+ sleep 1 # To make the server read data and clear the buffer here.
+ sock.send('world!"};', 0) # This subsequent data must be discarded so that a parsing failure doesn't occur.
+ sock.send('{"k":"v"};', 0) # This will succeed to parse.
+ end
+ end
+
+ logs = d.logs.collect do |log|
+ log.gsub(/\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [-+]\d{4} /, "")
+ end
+ actual_records = d.events.collect do |event|
+ event[2]
+ end
+
+ assert_equal(
+ {
+ # Asserting that '[warn]: pattern not matched message="world!\"}"' warning does not occur.
+ logs: ['[info]: The buffer size exceeds \'message_length_limit\', cleared: limit=12 size=17 head="{\"message\":\"hello"' + "\n"],
+ records: [{"k" => "v"}],
+ },
+ {
+ logs: logs[1..],
+ records: actual_records,
+ }
+ )
+ end
+ end
end