diff --git a/fluentd/configs.d/openshift/filter-viaq-data-model.conf b/fluentd/configs.d/openshift/filter-viaq-data-model.conf
index 70dae3a29..bfd079c5b 100644
--- a/fluentd/configs.d/openshift/filter-viaq-data-model.conf
+++ b/fluentd/configs.d/openshift/filter-viaq-data-model.conf
@@ -34,6 +34,12 @@
type k8s_journal
remove_keys "#{ENV['K8S_FILTER_REMOVE_KEYS'] || 'log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID'}"
+
+ tag "kubernetes.var.log.containers.eventrouter-** kubernetes.var.log.containers.cluster-logging-eventrouter-**"
+ type k8s_json_file
+ remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
+ process_kubernetes_events "#{ENV['TRANSFORM_EVENTS'] || 'true'}"
+
tag "kubernetes.var.log.containers**"
type k8s_json_file
diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec
index 9206da2c7..fad8f17be 100644
--- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec
+++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/fluent-plugin-viaq_data_model.gemspec
@@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
Gem::Specification.new do |gem|
gem.name = "fluent-plugin-viaq_data_model"
- gem.version = "0.0.20"
+ gem.version = "0.0.21"
gem.authors = ["Rich Megginson"]
gem.email = ["rmeggins@redhat.com"]
gem.description = %q{Filter plugin to ensure data is in the ViaQ common data model}
diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb
index b9c579956..e3b468edf 100644
--- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb
+++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model.rb
@@ -119,6 +119,8 @@ class ViaqDataModelFilter < Filter
config_param :tag, :string
desc 'remove these keys from the record - same as record_transformer "remove_keys" field'
config_param :remove_keys, :string, default: nil
+ desc 'enable/disable processing of kubernetes events'
+ config_param :process_kubernetes_events, :bool, default: nil
end
desc 'Which part of the pipeline is this - collector, normalizer, etc. for pipeline_metadata'
@@ -178,7 +180,6 @@ def configure(conf)
@formatters.each do |fmtr|
matcher = ViaqMatchClass.new(fmtr.tag, nil)
fmtr.instance_eval{ @params[:matcher] = matcher }
- fmtr.instance_eval{ @params[:fmtr_type] = fmtr.type }
if fmtr.remove_keys
fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') }
else
@@ -193,6 +194,8 @@ def configure(conf)
fmtr_func = method(:process_k8s_json_file_fields)
end
fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func }
+ proc_k8s_ev = fmtr.process_kubernetes_events.nil? ? @process_kubernetes_events : fmtr.process_kubernetes_events
+ fmtr.instance_eval{ @params[:process_kubernetes_events] = proc_k8s_ev }
end
@formatter_cache = {}
@formatter_cache_nomatch = {}
@@ -200,7 +203,7 @@ def configure(conf)
begin
@docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip
rescue
- @docker_hostname = nil
+ @docker_hostname = ENV['NODE_NAME'] || nil
end
@ipaddr4 = ENV['IPADDR4'] || '127.0.0.1'
@ipaddr6 = nil
@@ -303,7 +306,7 @@ def normalize_level(level, newlevel, priority=nil)
retlevel || 'unknown'
end
- def process_sys_var_log_fields(tag, time, record, fmtr_type = nil)
+ def process_sys_var_log_fields(tag, time, record, fmtr = nil)
record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}}
if record[@dest_time_name].nil? # e.g. already has @timestamp
# handle the case where the time reported in /var/log/messages is for a previous year
@@ -320,7 +323,7 @@ def process_sys_var_log_fields(tag, time, record, fmtr_type = nil)
end
end
- def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil)
+ def process_k8s_json_file_fields(tag, time, record, fmtr = nil)
record['message'] = record['message'] || record['log']
record['level'] = normalize_level(record['level'], nil)
if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \
@@ -339,7 +342,7 @@ def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil)
end
record['time'] = rectime.utc.to_datetime.rfc3339(6)
end
- transform_eventrouter(tag, record)
+ transform_eventrouter(tag, record, fmtr)
end
def check_for_match_and_format(tag, time, record)
@@ -355,7 +358,7 @@ def check_for_match_and_format(tag, time, record)
return
end
end
- fmtr.fmtr_func.call(tag, time, record, fmtr.fmtr_type)
+ fmtr.fmtr_func.call(tag, time, record, fmtr)
if record[@dest_time_name].nil? && record['time'].nil?
record['time'] = Time.at(time).utc.to_datetime.rfc3339(6)
@@ -449,8 +452,8 @@ def add_elasticsearch_index_name_field(tag, time, record)
end
end
- def transform_eventrouter(tag, record)
- return unless @process_kubernetes_events
+ def transform_eventrouter(tag, record, fmtr)
+ return if fmtr.nil? || !fmtr.process_kubernetes_events
if record.key?("event") && record["event"].respond_to?(:key?)
if record.key?("verb")
record["event"]["verb"] = record.delete("verb")
diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb
index 9d1add1ec..3c639b2c5 100644
--- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb
+++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/lib/fluent/plugin/filter_viaq_data_model_systemd.rb
@@ -68,7 +68,7 @@ module ViaqDataModelFilterSystemd
JOURNAL_TIME_FIELDS = ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP']
- def process_journal_fields(tag, time, record, fmtr_type)
+ def process_journal_fields(tag, time, record, fmtr)
systemd_t = {}
JOURNAL_FIELD_MAP_SYSTEMD_T.each do |jkey, key|
if record.key?(jkey)
@@ -104,7 +104,7 @@ def process_journal_fields(tag, time, record, fmtr_type)
break
end
end
- case fmtr_type
+ case fmtr.type
when :sys_journal
record['message'] = record['MESSAGE']
if record['_HOSTNAME'].eql?('localhost') && @docker_hostname
@@ -136,7 +136,7 @@ def process_journal_fields(tag, time, record, fmtr_type)
else
record['hostname'] = record['_HOSTNAME']
end
- transform_eventrouter(tag, record)
+ transform_eventrouter(tag, record, fmtr)
end
end
end
diff --git a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb
index 539968f2d..78adce397 100644
--- a/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb
+++ b/fluentd/vendored_gem_src/fluent-plugin-viaq_data_model/test/test_filter_viaq_data_model.rb
@@ -884,6 +884,37 @@ def add_event(input)
dellist = 'host,pid,ident,event,verb'.split(',')
dellist.each{|field| assert_nil(rec[field])}
end
+ test 'process a k8s json-file record with event from eventrouter, per formatter setting' do
+ ENV['IPADDR4'] = '127.0.0.1'
+ ENV['IPADDR6'] = '::1'
+ ENV['FLUENTD_VERSION'] = 'fversion'
+ ENV['DATA_VERSION'] = 'dversion'
+ input = {'kubernetes'=>{'host'=>'k8shost'},'stream'=>'stderr','time'=>@timestamp_str,'log'=>'mymessage'}
+ input = add_event(input)
+ rec = emit_with_tag('kubernetes.var.log.containers.name.name_this_that_other_log', input, '
+
+ tag "kubernetes.var.log.containers**"
+ type k8s_json_file
+ remove_keys log,stream
+ process_kubernetes_events true
+
+ pipeline_type collector
+ process_kubernetes_events false
+ ')
+ assert_equal('ADDED', rec['kubernetes']['event']['verb'])
+ assert_equal('event message', rec['message'])
+ assert_equal('mymessage', rec['pipeline_metadata']['collector']['original_raw_message'])
+ assert_equal('unknown', rec['level'])
+ assert_equal('2017-07-27T17:23:46.216527+00:00', rec['@timestamp'])
+ assert_equal('127.0.0.1', rec['pipeline_metadata']['collector']['ipaddr4'])
+ assert_equal('::1', rec['pipeline_metadata']['collector']['ipaddr6'])
+ assert_equal('fluent-plugin-systemd', rec['pipeline_metadata']['collector']['inputname'])
+ assert_equal('fluentd', rec['pipeline_metadata']['collector']['name'])
+ assert_equal('fversion dversion', rec['pipeline_metadata']['collector']['version'])
+ assert_equal(@timestamp_str, rec['pipeline_metadata']['collector']['received_at'])
+ dellist = 'host,pid,ident,event,verb'.split(',')
+ dellist.each{|field| assert_nil(rec[field])}
+ end
test 'process a k8s json-file record with a string valued timestamp' do
ENV['IPADDR4'] = '127.0.0.1'
ENV['IPADDR6'] = '::1'
diff --git a/test/eventrouter.sh b/test/eventrouter.sh
index 44174225c..794475392 100755
--- a/test/eventrouter.sh
+++ b/test/eventrouter.sh
@@ -67,7 +67,7 @@ cleanup() {
oc logs $evpod > $ARTIFACT_DIR/$evpod.log 2>&1
fi
fi
- # turn off fluentd eventrouter mode
+ # remove TRANSFORM_EVENTS
stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out
oc set env $fluentd_ds TRANSFORM_EVENTS- 2>&1 | artifact_out
start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out
@@ -80,11 +80,6 @@ cleanup() {
}
trap "cleanup" EXIT
-# put fluentd in eventrouter mode
-stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out
-oc set env $fluentd_ds TRANSFORM_EVENTS=true 2>&1 | artifact_out
-start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out
-
deploy_eventrouter
evpod=$( get_running_pod eventrouter )
if [ -z "$evpod" ]; then
@@ -164,3 +159,41 @@ if ! os::cmd::try_until_text "curl_es $esopssvc /.operations.*/_count?q=kubernet
exit 1
fi
+# disable eventrouter mode
+stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out
+oc set env $fluentd_ds TRANSFORM_EVENTS=false 2>&1 | artifact_out
+start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out
+
+oc apply -f - < $ARTIFACT_DIR/info-search-2.json 2>&1 || :
+ curl_es $esopssvc /.operations.*/_search?q=kubernetes.event.metadata.name:2eventroutertest2\&pretty > $ARTIFACT_DIR/info-search-3.json 2>&1 || :
+ exit 1
+fi