Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting "dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError" #37

Closed
ntanh1 opened this issue Sep 26, 2017 · 18 comments

Comments

@ntanh1
Copy link

ntanh1 commented Sep 26, 2017

Hi team,

I'm using concat plugin v2.1.0 for my FluentD container, the config is as follow:

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

<filter *.*>
  @type concat
  key log
  separator ""
  stream_identity_key container_id
  multiline_start_regexp /^---SL---/
  multiline_end_regexp /^---EL---&/
  flush_interval 10
</filter>

<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    log ${record["log"] != nil ? record["log"].sub('---SL---','') : ''}
  </record>
</filter>

<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    log ${record["log"] != nil ? record["log"].sub('---EL---&','') : ''}
  </record>
</filter>

further processing

................................................

So my log event will indicate its start point with ---SL--- and endpoint with ---EL---&.
There's a Java app running in another container and use fluent logging driver.

Problem is I'm getting timeout flush for some random event, e.g:

2017-09-26 07:27:16 +0000 [warn]: #0 fluent/log.rb:336:call: dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: docker.5eabe1bb5d52:5eabe1bb5d52251a978270f141ba2657c1e2ac3a5febfe081e48f0039aae7646" tag="docker.5eabe1bb5d52" time=#<Fluent::EventTime:0x007fd9c861a9c8 @sec=1506410836, @nsec=182951490> record={"container_name"=>"/gfast-sim-id-1-1", "source"=>"stdout", "log"=>"---SL---{\"date\":1506410826922,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-1-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-1 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d159, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r", "container_id"=>"5eabe1bb5d52251a978270f141ba2657c1e2ac3a5febfe081e48f0039aae7646"}

As you can see the log event is complete, we don't wait for pieces of that event and concat. So I cannot understand why the timeout happened.
It occurs quite randomly, some time with TestANV.1-1-1, some time with TestANV.1-1-2 (I have 5 such entities).

Can some one please help?

@okkez
Copy link
Member

okkez commented Sep 27, 2017

This is caused by race condition. I've fixed the race condition. Could you try #38 ?

@ntanh1
Copy link
Author

ntanh1 commented Sep 27, 2017

thanks, I'll check and get back

@ntanh1 ntanh1 closed this as completed Sep 27, 2017
@ntanh1 ntanh1 reopened this Sep 27, 2017
@ntanh1
Copy link
Author

ntanh1 commented Sep 27, 2017

still happening

2017-09-27 05:23:55 +0000 [warn]: #0 fluent/log.rb:336:call: dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: docker.d1ed37c3657a:d1ed37c3657a0c5e5f1491f8fc4bdc0710b524ef37115a2871362a11bcd42e45" tag="docker.d1ed37c3657a" time=#<Fluent::EventTime:0x007fee2028c178 @sec=1506489835, @nsec=915635486> record={"container_id"=>"d1ed37c3657a0c5e5f1491f8fc4bdc0710b524ef37115a2871362a11bcd42e45", "container_name"=>"/gfast-sim-id-1-1", "source"=>"stdout", "log"=>"---SL---{\"date\":1506489825384,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-5-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-5 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d433, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r"}
2017-09-27 05:23:55 +0000 [info]: #0 fluent/log.rb:316:call: Timeout flush: docker.d1ed37c3657a:d1ed37c3657a0c5e5f1491f8fc4bdc0710b524ef37115a2871362a11bcd42e45
bash-4.3# cat ./usr/lib/ruby/gems/2.2.0/gems/fluent-plugin-concat-2.1.0/lib/fluent/plugin/filter_concat.rb
require "fluent/plugin/filter"

module Fluent::Plugin
  class ConcatFilter < Filter
    Fluent::Plugin.register_filter("concat", self)

    helpers :timer, :event_emitter

    desc "The key for part of multiline log"
    config_param :key, :string
    desc "The separator of lines"
    config_param :separator, :string, default: "\n"
    desc "The number of lines"
    config_param :n_lines, :integer, default: nil
    desc "The regexp to match beginning of multiline"
    config_param :multiline_start_regexp, :string, default: nil
    desc "The regexp to match ending of multiline"
    config_param :multiline_end_regexp, :string, default: nil
    desc "The regexp to match continuous lines"
    config_param :continuous_line_regexp, :string, default: nil
    desc "The key to determine which stream an event belongs to"
    config_param :stream_identity_key, :string, default: nil
    desc "The interval between data flushes, 0 means disable timeout"
    config_param :flush_interval, :time, default: 60
    desc "The label name to handle timeout"
    config_param :timeout_label, :string, default: nil
    desc "Use timestamp of first record when buffer is flushed"
    config_param :use_first_timestamp, :bool, default: false

    class TimeoutError < StandardError
    end

    def initialize
      super

      @buffer = Hash.new {|h, k| h[k] = [] }
      @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
      @mutex = Thread::Mutex.new
    end

    def configure(conf)
      super

      if @n_lines && @multiline_start_regexp
        raise Fluent::ConfigError, "n_lines and multiline_start_regexp are exclusive"
      end
      if @n_lines.nil? && @multiline_start_regexp.nil?
        raise Fluent::ConfigError, "Either n_lines or multiline_start_regexp is required"
      end

      @mode = nil
      case
      when @n_lines
        @mode = :line
      when @multiline_start_regexp
        @mode = :regexp
        @multiline_start_regexp = Regexp.compile(@multiline_start_regexp[1..-2])
        if @multiline_end_regexp
          @multiline_end_regexp = Regexp.compile(@multiline_end_regexp[1..-2])
        end
        if @continuous_line_regexp
          @continuous_line_regexp = Regexp.compile(@continuous_line_regexp[1..-2])
        end
      end
    end

    def start
      super
      @finished = false
      timer_execute(:filter_concat_timer, 1, &method(:on_timer))
    end

    def shutdown
      @finished = true
      flush_remaining_buffer
      super
    end

    def filter_stream(tag, es)
      new_es = Fluent::MultiEventStream.new
      es.each do |time, record|
        if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag
          new_es.add(time, record)
          next
        end
        begin
          flushed_es = process(tag, time, record)
          unless flushed_es.empty?
            flushed_es.each do |_time, new_record|
              time = _time if @use_first_timestamp
              new_es.add(time, record.merge(new_record))
            end
          end
        rescue => e
          router.emit_error_event(tag, time, record, e)
        end
      end
      new_es
    end

    private

    def on_timer
      return if @flush_interval <= 0
      return if @finished
      flush_timeout_buffer
    end

    def process(tag, time, record)
      if @stream_identity_key
        stream_identity = "#{tag}:#{record[@stream_identity_key]}"
      else
        stream_identity = "#{tag}:default"
      end
      @timeout_map[stream_identity] = Fluent::Engine.now
      case @mode
      when :line
        process_line(stream_identity, tag, time, record)
      when :regexp
        process_regexp(stream_identity, tag, time, record)
      end
    end

    def process_line(stream_identity, tag, time, record)
      new_es = Fluent::MultiEventStream.new
      @mutex.synchronize do
        @buffer[stream_identity] << [tag, time, record]
      end
      if @buffer[stream_identity].size >= @n_lines
        new_time, new_record = @mutex.synchronize do
          flush_buffer(stream_identity)
        end
        time = new_time if @use_first_timestamp
        new_es.add(time, new_record)
      end
      new_es
    end

    def process_regexp(stream_identity, tag, time, record)
      new_es = Fluent::MultiEventStream.new
      case
      when firstline?(record[@key])
        if @buffer[stream_identity].empty?
          @mutex.synchronize do
            @buffer[stream_identity] << [tag, time, record]
          end
          if lastline?(record[@key])
            new_time, new_record = @mutex.synchronize do
              flush_buffer(stream_identity)
            end
            time = new_time if @use_first_timestamp
            new_es.add(time, new_record)
          end
        else
          new_time, new_record = @mutex.synchronize do
            flush_buffer(stream_identity, [tag, time, record])
          end
          time = new_time if @use_first_timestamp
          new_es.add(time, new_record)
          if lastline?(record[@key])
            new_time, new_record = @mutex.synchronize do
              flush_buffer(stream_identity)
            end
            time = new_time if @use_first_timestamp
            new_es.add(time, new_record)
          end
          return new_es
        end
      when lastline?(record[@key])
        @mutex.synchronize do
          @buffer[stream_identity] << [tag, time, record]
          new_time, new_record = flush_buffer(stream_identity)
        end
        time = new_time if @use_first_timestamp
        new_es.add(time, new_record)
        return new_es
      else
        if @buffer[stream_identity].empty?
          new_es.add(time, record)
          return new_es
        else
          if continuous_line?(record[@key])
            # Continuation of the previous line
            @mutex.synchronize do
              @buffer[stream_identity] << [tag, time, record]
            end
          else
            new_time, new_record = @mutex.synchronize do
              flush_buffer(stream_identity)
            end
            time = new_time if @use_first_timestamp
            new_es.add(time, new_record)
            new_es.add(time, record)
          end
        end
      end
      new_es
    end

    def firstline?(text)
      @multiline_start_regexp && !!@multiline_start_regexp.match(text)
    end

    def lastline?(text)
      @multiline_end_regexp && !!@multiline_end_regexp.match(text)
    end

    def continuous_line?(text)
      if @continuous_line_regexp
        !!@continuous_line_regexp.match(text)
      else
        true
      end
    end

    def flush_buffer(stream_identity, new_element = nil)
      lines = @buffer[stream_identity].map {|_tag, _time, record| record[@key] }
      _tag, time, first_record = @buffer[stream_identity].first
      new_record = {
        @key => lines.join(@separator)
      }
      @buffer[stream_identity] = []
      @buffer[stream_identity] << new_element if new_element
      [time, first_record.merge(new_record)]
    end

    def flush_timeout_buffer
      now = Fluent::Engine.now
      timeout_stream_identities = []
      @timeout_map.each do |stream_identity, previous_timestamp|
        next if @flush_interval > (now - previous_timestamp)
        next if @buffer[stream_identity].empty?
        next if @mutex.locked?
        time, flushed_record = @mutex.synchronize do
          flush_buffer(stream_identity)
        end
        timeout_stream_identities << stream_identity
        tag = stream_identity.split(":").first
        message = "Timeout flush: #{stream_identity}"
        handle_timeout_error(tag, @use_first_timestamp ? time : now, flushed_record, message)
        log.info(message)
      end
      @timeout_map.reject! do |stream_identity, _|
        timeout_stream_identities.include?(stream_identity)
      end
    end

    def flush_remaining_buffer
      @buffer.each do |stream_identity, elements|
        next if elements.empty?

        lines = elements.map {|_tag, _time, record| record[@key] }
        new_record = {
          @key => lines.join(@separator)
        }
        tag, time, record = elements.first
        message = "Flush remaining buffer: #{stream_identity}"
        handle_timeout_error(tag, time, record.merge(new_record), message)
        log.info(message)
      end
      @buffer.clear
    end

    def handle_timeout_error(tag, time, record, message)
      if @timeout_label
        event_router = event_emitter_router(@timeout_label)
        event_router.emit(tag, time, record)
      else
        router.emit_error_event(tag, time, record, TimeoutError.new(message))
      end
    end
  end
end

@okkez
Copy link
Member

okkez commented Sep 27, 2017

still happening

Is this easy to reproduce? Could you show me minimal reproduce sequence?
I could not reproduce this in my local environment.

@ntanh1
Copy link
Author

ntanh1 commented Sep 27, 2017

yes, it's reproducible all the time.

I 'create' my containers, put in the rb file with your fix, then 'up' my containers.

version: '2'
services:
    fluentd:
        image: fnms-fluent
        container_name: fnms-fluentd  
        restart: always
        ports:
            - "24224:24224"
        environment:
            - ES_IP=elasticsearch
            - ES_PORT=9200
            - KIBANA_VERSION=5.2.2
            - OPENTSDB_URL=http://opentsdb:4242
        links:
            - elasticsearch
        depends_on:
            - elasticsearch

    gfast-simulator-1:
        image: gfast-simulator-v2
        container_name: gfast-sim-id-1-1
        ports:
            - "1023:1023"
            - "9500:9500"
        environment:
            - com_nokia_anv_simulator_circuit_id_prefix=TestANV.1-1-
        stdin_open: true
        tty: true
        logging:
            driver: fluentd
            options:
               fluentd-address: "0.0.0.0:24224"
               tag: "docker.{{.ID}}"
        depends_on:
            - fluentd
            - anv
...

Logging works for some time, issue happens when there're 5 log events, there's usually 1 event missing and timeout (in this case TestANV.1-1-5):

TestANV.1-1-1 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=432, m_remoteHost='anv', m_remotePort='6524'}

TestANV.1-1-3 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=431, m_remoteHost='anv', m_remotePort='6524'}

TestANV.1-1-2 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=429, m_remoteHost='anv', m_remotePort='6524'}

TestANV.1-1-4 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=430, m_remoteHost='anv', m_remotePort='6524'}

@okkez
Copy link
Member

okkez commented Sep 27, 2017

Hmm, I could not reproduce with docker-compose and cat log file.

log file is like following:

---SL---
1aaaa
bbbb
cccc
dddd
eeee
ffff
---EL---&
(snip)
---SL---
5aaaa
bbbb
cccc
dddd
eeee
ffff
---EL---&

BTW, I noticed that your error message contains ---EL---&\r this ends with \r. This is strange behavior, I think.
I tested both LF log file and CRLF log file, but fluentd does not show \r at all.
I believe \n follows after ---EL---&\r.

Could you take sigdump when timeout is occured?
https://docs.fluentd.org/v0.12/articles/trouble-shooting#dump-fluentd-internal-information

And one more question, do logging keep running after timeout? Do logging stop after timeout?

@ntanh1
Copy link
Author

ntanh1 commented Sep 27, 2017

That log event is just same with others, I don't know if the others have \r, let me know if you need any further info.

Please find below sigdumps:
sigdump-12.log
sigdump-25.log

PID   USER     TIME   COMMAND
    1 root       0:00 /bin/sh -c ./startFluentd.sh
   11 root       0:00 {startFluentd.sh} /bin/bash ./startFluentd.sh
   12 root       0:01 {fluentd} /usr/bin/ruby /usr/bin/fluentd -c /fluentd/etc/fluent.conf -p /fluentd/plugins -v
   25 root       0:06 /usr/bin/ruby -Eascii-8bit:ascii-8bit /usr/bin/fluentd -c /fluentd/etc/fluent.conf -p /fluentd/plugins -v --under-supervisor
  151 root       0:00 bash
  158 root       0:00 ps
bash-4.3# kill -CONT 12;kill -CONT 25

I can see logging has stopped after timeout.

@okkez
Copy link
Member

okkez commented Sep 28, 2017

I want step by step reproducible sequence and full configuration to reproduce the issue.
I want to debug in my local environment.

I can see logging has stopped after timeout.

Next, I want to know that logs sent from docker containers reach Fluentd in_forward or not.
You can use Wireshark or tcpdump for this purpose.

Another way, use filter_stdout like following.
Insert filter_stdout before filter_concat.

<source>
  @type forward
  bind 0.0.0.0
  port 24224
</source>

<filter *.*>
  @type stdout
</filter>

<filter *.*>
  @type concat
  (snip)
</filter>
(snip)

@okkez
Copy link
Member

okkez commented Sep 28, 2017

Could you tell me following info?

  • Docker version
  • docker-compose version
  • About your docker images

@ntanh1
Copy link
Author

ntanh1 commented Sep 28, 2017

  • Docker version: Docker version 17.05.0-ce, build 89658be
  • Docker-compose: docker-compose version 1.8.0, build f3628c7
  • My docker image contains a Java application, it uses log4j with custom JsonLayout to produce logs as below:
public class JsonLayout extends Layout {
    private static final String START_LOG_TOKEN = "---SL---";
    private static final String END_LOG_TOKEN = "---EL---&";
...
    @Override
    public String format(LoggingEvent event) {
        Map<String, Object> jsonMapping = new HashMap<String, Object>();
        Map<String, Object> properties = parseLoggingProperties(event);
        jsonMapping.putAll(properties);
        jsonMapping.put("date", event.getTimeStamp());
        jsonMapping.put("level", event.getLevel().toString());
        jsonMapping.put("thread", event.getThreadName());
        jsonMapping.put("category", event.getLoggerName());
        jsonMapping.put("message", event.getMessage());
        jsonMapping.put("throwable", formatThrowable(event));
        return START_LOG_TOKEN + m_gson.toJson(jsonMapping) + END_LOG_TOKEN + "\n";
    }
...
}

Full config:

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

<filter *.*>
  @type concat
  key log
  separator ""
  stream_identity_key container_id
  multiline_start_regexp /^---SL---/
  multiline_end_regexp /^---EL---&/
  flush_interval 10
</filter>


<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    log ${record["log"] != nil ? record["log"].sub('---SL---','') : ''}
  </record>
</filter>

<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    log ${record["log"] != nil ? record["log"].sub('---EL---&','') : ''}
  </record>
</filter>

#this filter handles log events in Json format. If log format is not in Json (mostly from echo, see FNMS-7280), this logs a warning but still
# moving to the next filters
<filter *.*>
  @type parser
  format json
  key_name log
  reserve_data true
  suppress_parse_error_log true
</filter>

#dedicated for Syncope containers, we configure this as a logging tag in yml file
<filter syncope.*>
  @type record_transformer
  enable_ruby true
  auto_typecast true #to keep timeMillis as float instead of String
  <record>
    date ${record["timeMillis"]} #add new field date, value copied from timeMillis
    category ${record["loggerName"]} #add new field category, value copied from loggerName
    throwable -${record["thrown"]}
  </record>
  remove_keys timeMillis,endOfBatch,loggerFqcn,threadId,threadPriority,loggerName,thrown #remove uninteresting stuffs
</filter>


<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    # nil might be log from fluent itself
    container_name ${record["container_name"] != nil ? record["container_name"].sub('/','') : 'fluentd'}
  </record>
</filter>

<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    #convert date from milli seconds to %Y-%m-%dT%H:%M:%S.%3N%z format. If 'date' is not available in log event (mostly from echo, see FNMS-7280),
    # then take from Fluentd local time
    date ${record["date"] != nil ? Time.at(record["date"]/1000.0).strftime('%Y-%m-%dT%H:%M:%S.%3N%z') : Time.now.strftime('%Y-%m-%dT%H:%M:%S.%3N%z')}

    #if 'message' is not available in log event (mostly from echo, see FNMS-7280), then take from the 'log' field.
    message ${record["message"] != nil ? record["message"] : record["log"]}

    #if 'category' is not available in log event (mostly from echo, see FNMS-7280), then take from the container name.
    category ${record["category"] != nil ? record["category"] : record["container_name"]}
  </record>
  remove_keys log
</filter>

<match *.*>
    @type rewrite_tag_filter
    capitalize_regex_backreference yes
    rewriterule1 category !^(com\.nokia\.anv\.sm\.)(api\.MetricManager|consumer\.MetricAnnotationHandler)$ logstash-normal
    rewriterule2 category .+                                                                               logstash-metric
</match>

<match logstash-metric>
    @type opentsdb_metrics
    url "#{ENV['OPENTSDB_URL']}"
</match>

<match logstash-*>
    @type forest
    subtype elasticsearch
    <template>
        reload_on_failure true
        reconnect_on_error true
        logstash_format true
        type_name fluentd
        host "#{ENV['ES_IP']}"
        port "#{ENV['ES_PORT']}"
        buffer_type file
        buffer_chunk_limit 16m
        buffer_queue_limit 4096
        buffer_path /opt/fluentd/buffer/
        retry_wait 10s
        flush_interval 5s
        time_key date
        time_key_exclude_timestamp true
    </template>
</match>

ok, will try with your config and get back.

@ntanh1
Copy link
Author

ntanh1 commented Sep 28, 2017

Got following result:

2017-09-28 03:22:05.000000000 +0000 docker.18c8019ac54f: {"container_name":"/gfast-sim-id-1-1","source":"stdout","log":"---SL---{\"date\":1506568925190,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-2-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-2 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d184, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r","container_id":"18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214"}
2017-09-28 03:22:05.000000000 +0000 docker.18c8019ac54f: {"container_name":"/gfast-sim-id-1-1","source":"stdout","log":"---SL---{\"date\":1506568925233,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-1-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-1 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d185, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r","container_id":"18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214"}
2017-09-28 03:22:15 +0000 [warn]: #0 fluent/log.rb:336:call: dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: docker.18c8019ac54f:18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214" tag="docker.18c8019ac54f" time=#<Fluent::EventTime:0x007f6862939168 @sec=1506568935, @nsec=472123088> record={"container_name"=>"/gfast-sim-id-1-1", "source"=>"stdout", "log"=>"---SL---{\"date\":1506568925233,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-1-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-1 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d185, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r", "container_id"=>"18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214"}
2017-09-28 03:22:15 +0000 [info]: #0 fluent/log.rb:316:call: Timeout flush: docker.18c8019ac54f:18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214
2017-09-28 03:22:15.472280020 +0000 fluent.warn: {"error":"#<Fluent::Plugin::ConcatFilter::TimeoutError: Timeout flush: docker.18c8019ac54f:18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214>","tag":"docker.18c8019ac54f","time":1506568935,"record":{"container_name":"/gfast-sim-id-1-1","source":"stdout","log":"---SL---{\"date\":1506568925233,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-1-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-1 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d185, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r","container_id":"18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214"},"message":"dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error=\"Timeout flush: docker.18c8019ac54f:18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214\" tag=\"docker.18c8019ac54f\" time=#<Fluent::EventTime:0x007f6862939168 @sec=1506568935, @nsec=472123088> record={\"container_name\"=>\"/gfast-sim-id-1-1\", \"source\"=>\"stdout\", \"log\"=>\"---SL---{\\\"date\\\":1506568925233,\\\"level\\\":\\\"INFO\\\",\\\"thread\\\":\\\"TestANV.1-1-1-Thread-1\\\",\\\"category\\\":\\\"com.alcatel.netconf.simulator.fwk.DeviceServer\\\",\\\"message\\\":\\\"TestANV.1-1-1 says hello with : NetconfClientInfo{username\\\\u003d\\\\u0027TLS-CLIENT\\\\u0027, sessionId\\\\u003d185, m_remoteHost\\\\u003d\\\\u0027anv\\\\u0027, m_remotePort\\\\u003d\\\\u00276524\\\\u0027}\\\"}---EL---&\\r\", \"container_id\"=>\"18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214\"}"}
2017-09-28 03:22:15.472687269 +0000 fluent.info: {"message":"Timeout flush: docker.18c8019ac54f:18c8019ac54ff62ca0d2bf6c47beb17720c5a8c5c5a4f42c1f9a4e1ee2c19214"}

timeout for TestANV.1-1-1 but not TestANV.1-1-2

@okkez
Copy link
Member

okkez commented Sep 28, 2017

Thank you for your info.

Got following result:

This seems that filter_stdout does not exist.
I want to test that log event from Docker logging driver reach Fluentd.
filter_stdout before filter_concat can display raw log (multiple lines).

If filter_stdout does not display raw log after timeout occured. I think fluent-plugin-concat does not have problem.

BTW, Gson.toJson can generate single line JSON string. So you can stop using fluent-plugin-concat, If you can generate single line JSON string by your JsonLayout.format.

@okkez
Copy link
Member

okkez commented Sep 28, 2017

Could you try very simple configuration like following?
If reproduce problem, there are no problem in fluent-plugin-concat side.

<source>
  @type forward
  bind 0.0.0.0
  port 24224
</source>

<match *.*>
  @type stdout # or something
</match>

And, could you try without filter_concat?
If reproduce problem, there are no problem in fluent-plugin-concat.

@ntanh1
Copy link
Author

ntanh1 commented Sep 28, 2017

The background of using concat plugin is due to moby/moby#32923, our app might produce a long log with multiple lines and exceeds the limit 16KB of docker logging driver. In that case the log will be split and no longer in Json format, so I really need your plugin to help recover json format.

Before using concat plugin we never observed timeout issue or log missing, as you can see the error is coming from Fluent::Plugin::ConcatFilter::TimeoutError:

Good news is that it seems the app is just not logging anymore, concat plugin itself does not stop the log.

Now my workaround is treating timeout logs as NORMAL, then I'm able to receive all events

September 28th 2017, 15:56:32.479	TestANV.1-1-5 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=220, m_remoteHost='anv', m_remotePort='6524'}
September 28th 2017, 15:56:32.450	TestANV.1-1-4 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=219, m_remoteHost='anv', m_remotePort='6524'}
September 28th 2017, 15:56:32.178	TestANV.1-1-1 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=216, m_remoteHost='anv', m_remotePort='6524'}
September 28th 2017, 15:56:32.177	TestANV.1-1-3 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=217, m_remoteHost='anv', m_remotePort='6524'}
September 28th 2017, 15:56:32.176	TestANV.1-1-2 says hello with : NetconfClientInfo{username='TLS-CLIENT', sessionId=218, m_remoteHost='anv', m_remotePort='6524'}

My config is as below:

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

<filter *.*>
  @type concat
  key log
  separator ""
  stream_identity_key container_id
  multiline_start_regexp /^---SL---/
  multiline_end_regexp /^---EL---&/
  flush_interval 10
  timeout_label @NORMAL
</filter>

<match **>
  @type relabel
  @label @NORMAL
</match>

<label @NORMAL>
    <filter *.*>
      @type record_transformer

      #this allows ruby syntax in the below conversion
      enable_ruby true
      <record>
        log ${record["log"] != nil ? record["log"].sub('---SL---','') : ''}
      </record>
    </filter>

    <filter *.*>
      @type record_transformer

      #this allows ruby syntax in the below conversion
      enable_ruby true
      <record>
        log ${record["log"] != nil ? record["log"].sub('---EL---&','') : ''}
      </record>
    </filter>

    #this filter handles log events in Json format. If log format is not in Json (mostly from echo, see FNMS-7280), this logs a warning but still
    # moving to the next filters
    <filter *.*>
      @type parser
      format json
      key_name log
      reserve_data true
      suppress_parse_error_log true
    </filter>

    #dedicated for Syncope containers, we configure this as a logging tag in yml file
    <filter syncope.*>
      @type record_transformer
      enable_ruby true
      auto_typecast true #to keep timeMillis as float instead of String
      <record>
        date ${record["timeMillis"]} #add new field date, value copied from timeMillis
        category ${record["loggerName"]} #add new field category, value copied from loggerName
        throwable -${record["thrown"]}
      </record>
      remove_keys timeMillis,endOfBatch,loggerFqcn,threadId,threadPriority,loggerName,thrown #remove uninteresting stuffs
    </filter>


    <filter *.*>
      @type record_transformer

      #this allows ruby syntax in the below conversion
      enable_ruby true
      <record>
        # nil might be log from fluent itself
        container_name ${record["container_name"] != nil ? record["container_name"].sub('/','') : 'fluentd'}
      </record>
    </filter>

    <filter *.*>
      @type record_transformer

      #this allows ruby syntax in the below conversion
      enable_ruby true
      <record>
        #convert date from milli seconds to %Y-%m-%dT%H:%M:%S.%3N%z format. If 'date' is not available in log event (mostly from echo, see FNMS-7280),
        # then take from Fluentd local time
        date ${record["date"] != nil ? Time.at(record["date"]/1000.0).strftime('%Y-%m-%dT%H:%M:%S.%3N%z') : Time.now.strftime('%Y-%m-%dT%H:%M:%S.%3N%z')}

        #if 'message' is not available in log event (mostly from echo, see FNMS-7280), then take from the 'log' field.
        message ${record["message"] != nil ? record["message"] : record["log"]}

        #if 'category' is not available in log event (mostly from echo, see FNMS-7280), then take from the container name.
        category ${record["category"] != nil ? record["category"] : record["container_name"]}
      </record>
      remove_keys log
    </filter>

    <match *.*>
        @type rewrite_tag_filter
        capitalize_regex_backreference yes
        rewriterule1 category !^(com\.nokia\.anv\.sm\.)(api\.MetricManager|consumer\.MetricAnnotationHandler)$ logstash-normal
        rewriterule2 category .+                                                                               logstash-metric
    </match>

    <match logstash-metric>
        @type opentsdb_metrics
        url "#{ENV['OPENTSDB_URL']}"
    </match>

    <match logstash-*>
        @type forest
        subtype elasticsearch
        <template>
            reload_on_failure true
            reconnect_on_error true
            logstash_format true
            type_name fluentd
            host "#{ENV['ES_IP']}"
            port "#{ENV['ES_PORT']}"
            buffer_type file
            buffer_chunk_limit 16m
            buffer_queue_limit 4096
            buffer_path /opt/fluentd/buffer/
            retry_wait 10s
            flush_interval 5s
            time_key date
            time_key_exclude_timestamp true
        </template>
    </match>
</label>

still in fluentd log I see flush timeout,

2017-09-28 09:03:10 +0000 [info]: #0 fluent/log.rb:316:call: Timeout flush: docker.278e7c9dff31:278e7c9dff3191051738995848c4c14d747bb1d381ebb701403068c0af63f43f

2017-09-28 09:03:25 +0000 [info]: #0 fluent/log.rb:316:call: Timeout flush: docker.278e7c9dff31:278e7c9dff3191051738995848c4c14d747bb1d381ebb701403068c0af63f43f

@okkez
Copy link
Member

okkez commented Sep 28, 2017

Thank you for background and good news!
I don't know about docker logging driver limitation.

Good news is that it seems the app is just not logging anymore, concat plugin itself does not stop the log.
Now my workaround is treating timeout logs as NORMAL, then I'm able to receive all events

Do the app keep logging?
I hope the workaround is working well.

@ntanh1
Copy link
Author

ntanh1 commented Sep 28, 2017

yes, app keeps logging. The workaround helps!

@okkez
Copy link
Member

okkez commented Sep 28, 2017

I'm glad. Thanks!

@okkez
Copy link
Member

okkez commented Oct 24, 2017

resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants