Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions fluentd/configs.d/openshift/filter-viaq-data-model.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
type k8s_journal
remove_keys "#{ENV['K8S_FILTER_REMOVE_KEYS'] || 'log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID'}"
</formatter>
<formatter>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to pull this into the CLO when openshift/cluster-logging-operator#231 merges as it intentionally pulls all these configs into the conf generation instead of including these files.

tag "kubernetes.var.log.containers.eventrouter-** kubernetes.var.log.containers.cluster-logging-eventrouter-**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
process_kubernetes_events "#{ENV['TRANSFORM_EVENTS'] || 'true'}"
</formatter>
<formatter>
tag "kubernetes.var.log.containers**"
type k8s_json_file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |gem|
gem.name = "fluent-plugin-viaq_data_model"
gem.version = "0.0.20"
gem.version = "0.0.21"
gem.authors = ["Rich Megginson"]
gem.email = ["rmeggins@redhat.com"]
gem.description = %q{Filter plugin to ensure data is in the ViaQ common data model}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class ViaqDataModelFilter < Filter
config_param :tag, :string
desc 'remove these keys from the record - same as record_transformer "remove_keys" field'
config_param :remove_keys, :string, default: nil
desc 'enable/disable processing of kubernetes events'
config_param :process_kubernetes_events, :bool, default: nil
end

desc 'Which part of the pipeline is this - collector, normalizer, etc. for pipeline_metadata'
Expand Down Expand Up @@ -178,7 +180,6 @@ def configure(conf)
@formatters.each do |fmtr|
matcher = ViaqMatchClass.new(fmtr.tag, nil)
fmtr.instance_eval{ @params[:matcher] = matcher }
fmtr.instance_eval{ @params[:fmtr_type] = fmtr.type }
if fmtr.remove_keys
fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') }
else
Expand All @@ -193,14 +194,16 @@ def configure(conf)
fmtr_func = method(:process_k8s_json_file_fields)
end
fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func }
proc_k8s_ev = fmtr.process_kubernetes_events.nil? ? @process_kubernetes_events : fmtr.process_kubernetes_events
fmtr.instance_eval{ @params[:process_kubernetes_events] = proc_k8s_ev }
end
@formatter_cache = {}
@formatter_cache_nomatch = {}
end
begin
@docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip
rescue
@docker_hostname = nil
@docker_hostname = ENV['NODE_NAME'] || nil
end
@ipaddr4 = ENV['IPADDR4'] || '127.0.0.1'
@ipaddr6 = nil
Expand Down Expand Up @@ -303,7 +306,7 @@ def normalize_level(level, newlevel, priority=nil)
retlevel || 'unknown'
end

def process_sys_var_log_fields(tag, time, record, fmtr_type = nil)
def process_sys_var_log_fields(tag, time, record, fmtr = nil)
record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}}
if record[@dest_time_name].nil? # e.g. already has @timestamp
# handle the case where the time reported in /var/log/messages is for a previous year
Expand All @@ -320,7 +323,7 @@ def process_sys_var_log_fields(tag, time, record, fmtr_type = nil)
end
end

def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil)
def process_k8s_json_file_fields(tag, time, record, fmtr = nil)
record['message'] = record['message'] || record['log']
record['level'] = normalize_level(record['level'], nil)
if record.key?('kubernetes') && record['kubernetes'].respond_to?(:fetch) && \
Expand All @@ -339,7 +342,7 @@ def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil)
end
record['time'] = rectime.utc.to_datetime.rfc3339(6)
end
transform_eventrouter(tag, record)
transform_eventrouter(tag, record, fmtr)
end

def check_for_match_and_format(tag, time, record)
Expand All @@ -355,7 +358,7 @@ def check_for_match_and_format(tag, time, record)
return
end
end
fmtr.fmtr_func.call(tag, time, record, fmtr.fmtr_type)
fmtr.fmtr_func.call(tag, time, record, fmtr)

if record[@dest_time_name].nil? && record['time'].nil?
record['time'] = Time.at(time).utc.to_datetime.rfc3339(6)
Expand Down Expand Up @@ -449,8 +452,8 @@ def add_elasticsearch_index_name_field(tag, time, record)
end
end

def transform_eventrouter(tag, record)
return unless @process_kubernetes_events
def transform_eventrouter(tag, record, fmtr)
return if fmtr.nil? || !fmtr.process_kubernetes_events
if record.key?("event") && record["event"].respond_to?(:key?)
if record.key?("verb")
record["event"]["verb"] = record.delete("verb")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module ViaqDataModelFilterSystemd

JOURNAL_TIME_FIELDS = ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP']

def process_journal_fields(tag, time, record, fmtr_type)
def process_journal_fields(tag, time, record, fmtr)
systemd_t = {}
JOURNAL_FIELD_MAP_SYSTEMD_T.each do |jkey, key|
if record.key?(jkey)
Expand Down Expand Up @@ -104,7 +104,7 @@ def process_journal_fields(tag, time, record, fmtr_type)
break
end
end
case fmtr_type
case fmtr.type
when :sys_journal
record['message'] = record['MESSAGE']
if record['_HOSTNAME'].eql?('localhost') && @docker_hostname
Expand Down Expand Up @@ -136,7 +136,7 @@ def process_journal_fields(tag, time, record, fmtr_type)
else
record['hostname'] = record['_HOSTNAME']
end
transform_eventrouter(tag, record)
transform_eventrouter(tag, record, fmtr)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,37 @@ def add_event(input)
dellist = 'host,pid,ident,event,verb'.split(',')
dellist.each{|field| assert_nil(rec[field])}
end
test 'process a k8s json-file record with event from eventrouter, per formatter setting' do
ENV['IPADDR4'] = '127.0.0.1'
ENV['IPADDR6'] = '::1'
ENV['FLUENTD_VERSION'] = 'fversion'
ENV['DATA_VERSION'] = 'dversion'
input = {'kubernetes'=>{'host'=>'k8shost'},'stream'=>'stderr','time'=>@timestamp_str,'log'=>'mymessage'}
input = add_event(input)
rec = emit_with_tag('kubernetes.var.log.containers.name.name_this_that_other_log', input, '
<formatter>
tag "kubernetes.var.log.containers**"
type k8s_json_file
remove_keys log,stream
process_kubernetes_events true
</formatter>
pipeline_type collector
process_kubernetes_events false
')
assert_equal('ADDED', rec['kubernetes']['event']['verb'])
assert_equal('event message', rec['message'])
assert_equal('mymessage', rec['pipeline_metadata']['collector']['original_raw_message'])
assert_equal('unknown', rec['level'])
assert_equal('2017-07-27T17:23:46.216527+00:00', rec['@timestamp'])
assert_equal('127.0.0.1', rec['pipeline_metadata']['collector']['ipaddr4'])
assert_equal('::1', rec['pipeline_metadata']['collector']['ipaddr6'])
assert_equal('fluent-plugin-systemd', rec['pipeline_metadata']['collector']['inputname'])
assert_equal('fluentd', rec['pipeline_metadata']['collector']['name'])
assert_equal('fversion dversion', rec['pipeline_metadata']['collector']['version'])
assert_equal(@timestamp_str, rec['pipeline_metadata']['collector']['received_at'])
dellist = 'host,pid,ident,event,verb'.split(',')
dellist.each{|field| assert_nil(rec[field])}
end
test 'process a k8s json-file record with a string valued timestamp' do
ENV['IPADDR4'] = '127.0.0.1'
ENV['IPADDR6'] = '::1'
Expand Down
45 changes: 39 additions & 6 deletions test/eventrouter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ cleanup() {
oc logs $evpod > $ARTIFACT_DIR/$evpod.log 2>&1
fi
fi
# turn off fluentd eventrouter mode
# remove TRANSFORM_EVENTS
stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out
oc set env $fluentd_ds TRANSFORM_EVENTS- 2>&1 | artifact_out
start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out
Expand All @@ -80,11 +80,6 @@ cleanup() {
}
trap "cleanup" EXIT

# put fluentd in eventrouter mode
stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out
oc set env $fluentd_ds TRANSFORM_EVENTS=true 2>&1 | artifact_out
start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out

deploy_eventrouter
evpod=$( get_running_pod eventrouter )
if [ -z "$evpod" ]; then
Expand Down Expand Up @@ -164,3 +159,41 @@ if ! os::cmd::try_until_text "curl_es $esopssvc /.operations.*/_count?q=kubernet
exit 1
fi

# disable eventrouter mode
stop_fluentd "" $FLUENTD_WAIT_TIME 2>&1 | artifact_out
oc set env $fluentd_ds TRANSFORM_EVENTS=false 2>&1 | artifact_out
start_fluentd false $FLUENTD_WAIT_TIME 2>&1 | artifact_out

oc apply -f - <<EOF
{
"apiVersion": "v1",
"count": 1,
"eventTime": null,
"involvedObject": {
"apiVersion": "apps.openshift.io/v1",
"kind": "DeploymentConfig",
"name": "2eventroutertest2",
"namespace": "default"
},
"kind": "Event",
"message": "2eventroutertest2",
"metadata": {
"name": "2eventroutertest2",
"namespace": "default"
},
"reason": "DeploymentCreated",
"reportingComponent": "",
"reportingInstance": "",
"source": {
"component": "deploymentconfig-controller"
},
"type": "Info"
}
EOF

if ! os::cmd::try_until_text "curl_es $esopssvc /.operations.*/_count?q=event.metadata.name:2eventroutertest2 | get_count_from_json" "^1\$" $FLUENTD_WAIT_TIME ; then
os::log::error the event 2eventroutertest2 was processed as a kubernetes event even though TRANSFORM_EVENTS=false
curl_es $esopssvc /.operations.*/_search?q=event.metadata.name:2eventroutertest2\&pretty > $ARTIFACT_DIR/info-search-2.json 2>&1 || :
curl_es $esopssvc /.operations.*/_search?q=kubernetes.event.metadata.name:2eventroutertest2\&pretty > $ARTIFACT_DIR/info-search-3.json 2>&1 || :
exit 1
fi