-
Notifications
You must be signed in to change notification settings - Fork 35
Feat: ECS compatibility (added headers_target and attachments_target) #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
608b824
b985b3a
fbdc5df
70ba899
bac62de
5927108
e5530a7
a65d8c2
96f3ba7
a318b09
c002ec7
df0e436
7ac861b
0e35498
cfba076
3196ff4
64253a8
dd381bf
79a5eae
bfcb44f
68e92f9
b345e0e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3,13 +3,22 @@ | |||||
| require "logstash/namespace" | ||||||
| require "logstash/timestamp" | ||||||
| require "stud/interval" | ||||||
| require "socket" # for Socket.gethostname | ||||||
| require 'fileutils' | ||||||
|
|
||||||
| require 'logstash/plugin_mixins/ecs_compatibility_support' | ||||||
| require 'logstash/plugin_mixins/ecs_compatibility_support/target_check' | ||||||
| require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter' | ||||||
|
|
||||||
| # Read mails from IMAP server | ||||||
| # | ||||||
| # Periodically scan an IMAP folder (`INBOX` by default) and move any read messages | ||||||
| # to the trash. | ||||||
| class LogStash::Inputs::IMAP < LogStash::Inputs::Base | ||||||
|
|
||||||
| include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) | ||||||
|
|
||||||
| extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter | ||||||
|
|
||||||
| config_name "imap" | ||||||
|
|
||||||
| default :codec, "plain" | ||||||
|
|
@@ -24,15 +33,23 @@ class LogStash::Inputs::IMAP < LogStash::Inputs::Base | |||||
|
|
||||||
| config :folder, :validate => :string, :default => 'INBOX' | ||||||
| config :fetch_count, :validate => :number, :default => 50 | ||||||
| config :lowercase_headers, :validate => :boolean, :default => true | ||||||
| config :check_interval, :validate => :number, :default => 300 | ||||||
|
|
||||||
| config :lowercase_headers, :validate => :boolean, :default => true | ||||||
|
|
||||||
| config :headers_target, :validate => :field_reference # ECS default: [@metadata][input][imap][headers] | ||||||
|
|
||||||
| config :delete, :validate => :boolean, :default => false | ||||||
| config :expunge, :validate => :boolean, :default => false | ||||||
|
|
||||||
| config :strip_attachments, :validate => :boolean, :default => false | ||||||
| config :save_attachments, :validate => :boolean, :default => false | ||||||
|
|
||||||
| # For multipart messages, use the first part that has this | ||||||
| # content-type as the event message. | ||||||
| # Legacy default: [attachments] | ||||||
| # ECS default: [@metadata][input][imap][attachments] | ||||||
| config :attachments_target, :validate => :field_reference | ||||||
|
|
||||||
| # For multipart messages, use the first part that has this content-type as the event message. | ||||||
| config :content_type, :validate => :string, :default => "text/plain" | ||||||
|
|
||||||
| # Whether to use IMAP uid to track last processed message | ||||||
|
|
@@ -41,6 +58,28 @@ class LogStash::Inputs::IMAP < LogStash::Inputs::Base | |||||
| # Path to file with last run time metadata | ||||||
| config :sincedb_path, :validate => :string, :required => false | ||||||
|
|
||||||
| def initialize(*params) | ||||||
| super | ||||||
|
|
||||||
| if original_params.include?('headers_target') | ||||||
| @headers_target = normalize_field_ref(@headers_target) | ||||||
| else | ||||||
| @headers_target = '[@metadata][input][imap][headers]' if ecs_compatibility != :disabled | ||||||
| end | ||||||
|
|
||||||
| if original_params.include?('attachments_target') | ||||||
| @attachments_target = normalize_field_ref(@attachments_target) | ||||||
| else | ||||||
| @attachments_target = ecs_compatibility != :disabled ? '[@metadata][input][imap][attachments]' : '[attachments]' | ||||||
| end | ||||||
| end | ||||||
|
|
||||||
| def normalize_field_ref(target) | ||||||
| # so we can later event.set("#{target}[#{name}]", ...) | ||||||
| target.match?(/\A[^\[\]]+\z/) ? "[#{target}]" : target | ||||||
| end | ||||||
| private :normalize_field_ref | ||||||
|
|
||||||
| def register | ||||||
| require "net/imap" # in stdlib | ||||||
| require "mail" # gem 'mail' | ||||||
|
|
@@ -63,14 +102,16 @@ def register | |||||
| # Ensure that the filepath exists before writing, since it's deeply nested. | ||||||
| FileUtils::mkdir_p datapath | ||||||
| @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest("#{@user}_#{@host}_#{@port}_#{@folder}")) | ||||||
| @logger.debug? && @logger.debug("Generated sincedb path", sincedb_path: @sincedb_path) | ||||||
| end | ||||||
| if File.directory?(@sincedb_path) | ||||||
| raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") | ||||||
| end | ||||||
| @logger.info("Using \"sincedb_path\": \"#{@sincedb_path}\"") | ||||||
|
yaauie marked this conversation as resolved.
|
||||||
|
|
||||||
| if File.exist?(@sincedb_path) | ||||||
| if File.directory?(@sincedb_path) | ||||||
| raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") | ||||||
| end | ||||||
| @logger.debug? && @logger.debug("Found existing sincedb path", sincedb_path: @sincedb_path) | ||||||
| @uid_last_value = File.read(@sincedb_path).to_i | ||||||
| @logger.info("Loading \"uid_last_value\": \"#{@uid_last_value}\"") | ||||||
| @logger.debug? && @logger.debug("Loaded from sincedb", uid_last_value: @uid_last_value) | ||||||
| end | ||||||
|
|
||||||
| @content_type_re = Regexp.new("^" + @content_type) | ||||||
|
|
@@ -145,7 +186,7 @@ def check_mail(queue) | |||||
| # Always save @uid_last_value so when tracking is switched from | ||||||
| # "NOT SEEN" to "UID" we will continue from first unprocessed message | ||||||
| if @uid_last_value | ||||||
| @logger.info("Saving \"uid_last_value\": \"#{@uid_last_value}\"") | ||||||
| @logger.debug? && @logger.debug("Saving to sincedb", uid_last_value: @uid_last_value) | ||||||
| File.write(@sincedb_path, @uid_last_value) | ||||||
| end | ||||||
| end | ||||||
|
|
@@ -164,7 +205,8 @@ def parse_attachments(mail) | |||||
|
|
||||||
| def parse_mail(mail) | ||||||
| # Add a debug message so we can track what message might cause an error later | ||||||
| @logger.debug? && @logger.debug("Working with message_id", :message_id => mail.message_id) | ||||||
| @logger.debug? && @logger.debug("Processing mail", message_id: mail.message_id) | ||||||
|
|
||||||
| # TODO(sissel): What should a multipart message look like as an event? | ||||||
| # For now, just take the plain-text part and set it as the message. | ||||||
| if mail.parts.count == 0 | ||||||
|
|
@@ -186,32 +228,35 @@ def parse_mail(mail) | |||||
| # Add fields: Add message.header_fields { |h| h.name=> h.value } | ||||||
| mail.header_fields.each do |header| | ||||||
| # 'header.name' can sometimes be a Mail::Multibyte::Chars, get it in String form | ||||||
| name = @lowercase_headers ? header.name.to_s.downcase : header.name.to_s | ||||||
| name = header.name.to_s | ||||||
|
|
||||||
| # Assume we already processed the 'date' above. | ||||||
| next if name == "Date" | ||||||
|
|
||||||
| name = name.downcase if @lowercase_headers | ||||||
|
|
||||||
| # Call .decoded on the header in case it's in encoded-word form. | ||||||
| # Details at: | ||||||
| # https://github.com/mikel/mail/blob/master/README.md#encodings | ||||||
| # http://tools.ietf.org/html/rfc2047#section-2 | ||||||
| value = transcode_to_utf8(header.decoded.to_s) | ||||||
|
|
||||||
| # Assume we already processed the 'date' above. | ||||||
| next if name == "Date" | ||||||
|
|
||||||
| case (field = event.get(name)) | ||||||
| targeted_name = "#{@headers_target}[#{name}]" | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Example: pre-normalizing the
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, would still rather keep the 'canonical' form here - subjectively find it less confusing than the composite. it's also clearer this way to handle the case when headers_target is nil ->
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this brings up a good question though, when user explicitly sets a or does this sound confusing?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that setting When the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
✔️ (pending 6.x compatibility there's one extra change since last review - not skipping the
so while it looks like a bug the "fix" (to properly skip the header regardless or |
||||||
| case (field = event.get(targeted_name)) | ||||||
| when String | ||||||
| # promote string to array if a header appears multiple times | ||||||
| # (like 'received') | ||||||
| event.set(name, [field, value]) | ||||||
| # promote string to array if a header appears multiple times (like 'received') | ||||||
| event.set(targeted_name, [field, value]) | ||||||
| when Array | ||||||
| field << value | ||||||
| event.set(name, field) | ||||||
| event.set(targeted_name, field) | ||||||
| when nil | ||||||
| event.set(name, value) | ||||||
| event.set(targeted_name, value) | ||||||
| end | ||||||
| end | ||||||
|
|
||||||
| # Add attachments | ||||||
| if attachments && attachments.length > 0 | ||||||
| event.set('attachments', attachments) | ||||||
| event.set(@attachments_target, attachments) | ||||||
| end | ||||||
|
|
||||||
| decorate(event) | ||||||
|
|
@@ -221,7 +266,6 @@ def parse_mail(mail) | |||||
|
|
||||||
| def stop | ||||||
| Stud.stop!(@run_thread) | ||||||
| $stdin.close | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😬 whoops. Good catch. |
||||||
| end | ||||||
|
|
||||||
| private | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.