From c7272dcc0f7daf9c2417d43dc5f7b437a7fbd7e7 Mon Sep 17 00:00:00 2001 From: Rich Megginson Date: Thu, 17 Oct 2019 15:40:58 -0600 Subject: [PATCH] Bug 1756920: fluentd pods doesn't process kubernetes events https://bugzilla.redhat.com/show_bug.cgi?id=1756920 Use the new feature of the viaq plugin to only do the event transform for records with a specific tag. --- .../openshift/filter-viaq-data-model.conf | 6 +++ .../fluent-plugin-viaq_data_model.gemspec | 2 +- .../fluent/plugin/filter_viaq_data_model.rb | 19 ++++---- .../plugin/filter_viaq_data_model_systemd.rb | 6 +-- .../test/test_filter_viaq_data_model.rb | 31 +++++++++++++ test/eventrouter.sh | 45 ++++++++++++++++--- 6 files changed, 91 insertions(+), 18 deletions(-) diff --git a/fluentd/configs.d/openshift/filter-viaq-data-model.conf b/fluentd/configs.d/openshift/filter-viaq-data-model.conf index 70dae3a29..bfd079c5b 100644 --- a/fluentd/configs.d/openshift/filter-viaq-data-model.conf +++ b/fluentd/configs.d/openshift/filter-viaq-data-model.conf @@ -34,6 +34,12 @@ type k8s_journal remove_keys "#{ENV['K8S_FILTER_REMOVE_KEYS'] || 'log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID'}" + + tag "kubernetes.var.log.containers.eventrouter-** kubernetes.var.log.containers.cluster-logging-eventrouter-**" + type k8s_json_file + remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME + process_kubernetes_events "#{ENV['TRANSFORM_EVENTS'] || 'true'}" + tag "kubernetes.var.log.containers**" type k8s_json_file diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec index 9206da2c7..fad8f17be 100644 --- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec +++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec @@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) Gem::Specification.new do |gem| gem.name = "fluent-plugin-viaq_data_model" - gem.version = "0.0.20" + gem.version = "0.0.21" gem.authors = ["Rich Megginson"] gem.email = ["rmeggins@redhat.com"] gem.description = %q{Filter plugin to ensure data is in the ViaQ common data model} diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb index b9c579956..e3b468edf 100644 --- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb +++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb @@ -119,6 +119,8 @@ class ViaqDataModelFilter < Filter config_param :tag, :string desc 'remove these keys from the record - same as record_transformer "remove_keys" field' config_param :remove_keys, :string, default: nil + desc 'enable/disable processing of kubernetes events' + config_param :process_kubernetes_events, :bool, default: nil end desc 'Which part of the pipeline is this - collector, normalizer, etc. for pipeline_metadata' @@ -178,7 +180,6 @@ def configure(conf) @formatters.each do |fmtr| matcher = ViaqMatchClass.new(fmtr.tag, nil) fmtr.instance_eval{ @params[:matcher] = matcher } - fmtr.instance_eval{ @params[:fmtr_type] = fmtr.type } if fmtr.remove_keys fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') } else @@ -193,6 +194,8 @@ def configure(conf) fmtr_func = method(:process_k8s_json_file_fields) end fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func } + proc_k8s_ev = fmtr.process_kubernetes_events.nil? ? @process_kubernetes_events : fmtr.process_kubernetes_events + fmtr.instance_eval{ @params[:process_kubernetes_events] = proc_k8s_ev } end @formatter_cache = {} @formatter_cache_nomatch = {} @@ -200,7 +203,7 @@ def configure(conf) begin @docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip rescue - @docker_hostname = nil + @docker_hostname = ENV['NODE_NAME'] || nil end @ipaddr4 = ENV['IPADDR4'] || '127.0.0.1' @ipaddr6 = nil @@ -303,7 +306,7 @@ def normalize_level(level, newlevel, priority=nil) retlevel || 'unknown' end - def process_sys_var_log_fields(tag, time, record, fmtr_type = nil) + def process_sys_var_log_fields(tag, time, record, fmtr = nil) record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}} if record[@dest_time_name].nil? # e.g. already has @timestamp # handle the case where the time reported in /var/log/messages is for a previous year @@ -320,7 +323,7 @@ def process_sys_var_log_fields(tag, time, record, fmtr_type = nil) end end - def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil) + def process_k8s_json_file_fields(tag, time, record, fmtr = nil) record['message'] = record['message'] || record['log'] record['level'] = normalize_level(record['level'], nil) if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \ @@ -339,7 +342,7 @@ def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil) end record['time'] = rectime.utc.to_datetime.rfc3339(6) end - transform_eventrouter(tag, record) + transform_eventrouter(tag, record, fmtr) end def check_for_match_and_format(tag, time, record) @@ -355,7 +358,7 @@ def check_for_match_and_format(tag, time, record) return end end - fmtr.fmtr_func.call(tag, time, record, fmtr.fmtr_type) + fmtr.fmtr_func.call(tag, time, record, fmtr) if record[@dest_time_name].nil? && record['time'].nil? record['time'] = Time.at(time).utc.to_datetime.rfc3339(6) @@ -449,8 +452,8 @@ def add_elasticsearch_index_name_field(tag, time, record) end end - def transform_eventrouter(tag, record) - return unless @process_kubernetes_events + def transform_eventrouter(tag, record, fmtr) + return if fmtr.nil? || !fmtr.process_kubernetes_events if record.key?("event") && record["event"].respond_to?(:key?) if record.key?("verb") record["event"]["verb"] = record.delete("verb") diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb index 9d1add1ec..3c639b2c5 100644 --- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb +++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb @@ -68,7 +68,7 @@ module ViaqDataModelFilterSystemd JOURNAL_TIME_FIELDS = ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP'] - def process_journal_fields(tag, time, record, fmtr_type) + def process_journal_fields(tag, time, record, fmtr) systemd_t = {} JOURNAL_FIELD_MAP_SYSTEMD_T.each do |jkey, key| if record.key?(jkey) @@ -104,7 +104,7 @@ def process_journal_fields(tag, time, record, fmtr_type) break end end - case fmtr_type + case fmtr.type when :sys_journal record['message'] = record['MESSAGE'] if record['_HOSTNAME'].eql?('localhost') && @docker_hostname @@ -136,7 +136,7 @@ def process_journal_fields(tag, time, record, fmtr_type) else record['hostname'] = record['_HOSTNAME'] end - transform_eventrouter(tag, record) + transform_eventrouter(tag, record, fmtr) end end end diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb index 539968f2d..78adce397 100644 --- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb +++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb @@ -884,6 +884,37 @@ def add_event(input) dellist = 'host,pid,ident,event,verb'.split(',') dellist.each{|field| assert_nil(rec[field])} end + test 'process a k8s json-file record with event from eventrouter, per formatter setting' do + ENV['IPADDR4'] = '127.0.0.1' + ENV['IPADDR6'] = '::1' + ENV['FLUENTD_VERSION'] = 'fversion' + ENV['DATA_VERSION'] = 'dversion' + input = {'kubernetes'=>{'host'=>'k8shost'},'stream'=>'stderr','time'=>@timestamp_str,'log'=>'mymessage'} + input = add_event(input) + rec = emit_with_tag('kubernetes.var.log.containers.name.name_this_that_other_log', input, ' + + tag "kubernetes.var.log.containers**" + type k8s_json_file + remove_keys log,stream + process_kubernetes_events true + + pipeline_type collector + process_kubernetes_events false + ') + assert_equal('ADDED', rec['kubernetes']['event']['verb']) + assert_equal('event message', rec['message']) + assert_equal('mymessage', rec['pipeline_metadata']['collector']['original_raw_message']) + assert_equal('unknown', rec['level']) + assert_equal('2017-07-27T17:23:46.216527+00:00', rec['@timestamp']) + assert_equal('127.0.0.1', rec['pipeline_metadata']['collector']['ipaddr4']) + assert_equal('::1', rec['pipeline_metadata']['collector']['ipaddr6']) + assert_equal('fluent-plugin-systemd', rec['pipeline_metadata']['collector']['inputname']) + assert_equal('fluentd', rec['pipeline_metadata']['collector']['name']) + assert_equal('fversion dversion', rec['pipeline_metadata']['collector']['version']) + assert_equal(@timestamp_str, rec['pipeline_metadata']['collector']['received_at']) + dellist = 'host,pid,ident,event,verb'.split(',') + dellist.each{|field| assert_nil(rec[field])} + end test 'process a k8s json-file record with a string valued timestamp' do ENV['IPADDR4'] = '127.0.0.1' ENV['IPADDR6'] = '::1' diff --git a/test/eventrouter.sh b/test/eventrouter.sh index 44174225c..794475392 100755 --- a/test/eventrouter.sh +++ b/test/eventrouter.sh @@ -67,7 +67,7 @@ cleanup() { oc logs $evpod > $ARTIFACT_DIR/$evpod.log 2>&1 fi fi - # turn off fluentd eventrouter mode + # remove TRANSFORM_EVENTS stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out oc set env $fluentd_ds TRANSFORM_EVENTS- 2>&1 | artifact_out start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out @@ -80,11 +80,6 @@ cleanup() { } trap "cleanup" EXIT -# put fluentd in eventrouter mode -stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out -oc set env $fluentd_ds TRANSFORM_EVENTS=true 2>&1 | artifact_out -start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out - deploy_eventrouter evpod=$( get_running_pod eventrouter ) if [ -z "$evpod" ]; then @@ -164,3 +159,41 @@ if ! os::cmd::try_until_text "curl_es $esopssvc /.operations.*/_count?q=kubernet exit 1 fi +# disable eventrouter mode +stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out +oc set env $fluentd_ds TRANSFORM_EVENTS=false 2>&1 | artifact_out +start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out + +oc apply -f - < $ARTIFACT_DIR/info-search-2.json 2>&1 || : + curl_es $esopssvc /.operations.*/_search?q=kubernetes.event.metadata.name:2eventroutertest2\&pretty > $ARTIFACT_DIR/info-search-3.json 2>&1 || : + exit 1 +fi