-
Notifications
You must be signed in to change notification settings - Fork 3.5k
fix database manager with multiple pipelines #12862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
5b83225
94ee10c
222d807
8646194
1d2ab08
950fe8c
481bb22
18a9cac
4d8be13
0dd4c9d
f4cf2a5
580824b
068f9da
021d48d
c885b82
fe26d4b
0f94420
439e292
02e094e
7127201
f360013
0b0c4fb
46c3ac6
c9f2fe7
195083f
5c2c862
fb2b592
ba9bbd9
73b7a4f
59880b6
cda5900
79c15de
b9b248c
bb22ab8
dfca338
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| require "down" | ||
| require "rufus/scheduler" | ||
| require "date" | ||
| require "concurrent" | ||
|
|
||
| # The mission of DatabaseManager is to ensure the plugin running an up-to-date MaxMind database and | ||
| # thus users are compliant with EULA. | ||
|
|
@@ -21,7 +22,7 @@ | |
| # to keep track of versions and the number of days disconnects to the endpoint. | ||
| # Once a new database version release, DownloadManager downloads it, and GeoIP Filter uses it on-the-fly. | ||
| # If the last update timestamp is 25 days ago, a warning message shows in the log; | ||
| # if it was 30 days ago, the GeoIP Filter should shutdown in order to be compliant. | ||
| # if it was 30 days ago, the GeoIP Filter should stop using EULA database in order to be compliant. | ||
| # There are online mode and offline mode in DatabaseManager. `online` is for automatic database update | ||
| # while `offline` is for static database path provided by users | ||
|
|
||
|
|
@@ -30,134 +31,193 @@ module LogStash module Filters module Geoip class DatabaseManager | |
| include LogStash::Util::Loggable | ||
| include LogStash::Filters::Geoip::Util | ||
|
|
||
| #TODO remove vendor_path | ||
| def initialize(geoip, database_path, database_type, vendor_path) | ||
| @geoip = geoip | ||
| self.class.prepare_cc_db | ||
| @mode = database_path.nil? ? :online : :offline | ||
| @database_type = database_type | ||
| @database_path = patch_database_path(database_path) | ||
|
|
||
| if @mode == :online | ||
| logger.info "By using `online` mode, you accepted and agreed MaxMind EULA. "\ | ||
| "For more details please visit https://www.maxmind.com/en/geolite2/eula" | ||
| @@instance = nil | ||
| @@instance_mutex = Mutex.new | ||
|
|
||
| setup | ||
| clean_up_database | ||
| execute_download_job | ||
| def self.instance | ||
| return @@instance if @@instance | ||
|
|
||
| # check database update periodically. trigger `call` method | ||
| @scheduler = Rufus::Scheduler.new({:max_work_threads => 1}) | ||
| @scheduler.every('24h', self) | ||
| else | ||
| logger.info "GeoIP plugin is in offline mode. Logstash points to static database files and will not check for update. "\ | ||
| "Keep in mind that if you are not using the database shipped with this plugin, "\ | ||
| "please go to https://www.maxmind.com/en/geolite2/eula to accept and agree the terms and conditions." | ||
| @@instance_mutex.synchronize do | ||
| return @@instance if @@instance | ||
| @@instance = new | ||
| end | ||
|
|
||
| @@instance | ||
| end | ||
|
|
||
| DEFAULT_DATABASE_FILENAME = %w{ | ||
| GeoLite2-City.mmdb | ||
| GeoLite2-ASN.mmdb | ||
| }.map(&:freeze).freeze | ||
| private_class_method :new | ||
|
|
||
| public | ||
| private | ||
| def initialize | ||
| self.class.prepare_cc_db | ||
|
jsvd marked this conversation as resolved.
Outdated
|
||
|
|
||
| # create data dir, path.data, for geoip if it doesn't exist | ||
| # copy CC databases to data dir | ||
| def self.prepare_cc_db | ||
| FileUtils::mkdir_p(get_data_dir) | ||
| unless ::File.exist?(get_file_path(CITY_DB_NAME)) && ::File.exist?(get_file_path(ASN_DB_NAME)) | ||
| cc_database_paths = ::Dir.glob(::File.join(LogStash::Environment::LOGSTASH_HOME, "vendor", "**", "{GeoLite2-ASN,GeoLite2-City}.mmdb")) | ||
| FileUtils.cp_r(cc_database_paths, get_data_dir) | ||
| cc_city_database_path = get_db_path(CITY, CC) | ||
| cc_asn_database_path = get_db_path(ASN, CC) | ||
|
|
||
|
jsvd marked this conversation as resolved.
|
||
| @metadata = DatabaseMetadata.new | ||
| unless @metadata.exist? | ||
| @metadata.save_metadata(CITY, CC, false) | ||
| @metadata.save_metadata(ASN, CC, false) | ||
| end | ||
|
jsvd marked this conversation as resolved.
Outdated
|
||
|
|
||
| city_database_path = @metadata.database_path(CITY) || cc_city_database_path | ||
| asn_database_path = @metadata.database_path(ASN) || cc_asn_database_path | ||
|
|
||
| # reset md5 to allow re-download when the file is gone | ||
| DB_TYPES.map { |type| @metadata.reset_md5(type) if @metadata.database_path(type).nil? } | ||
|
jsvd marked this conversation as resolved.
Outdated
|
||
|
|
||
| @states = { "#{CITY}" => DatabaseState.new(@metadata.is_eula(CITY), | ||
| Concurrent::Array.new, | ||
| city_database_path, | ||
| cc_city_database_path), | ||
| "#{ASN}" => DatabaseState.new(@metadata.is_eula(ASN), | ||
| Concurrent::Array.new, | ||
| asn_database_path, | ||
| cc_asn_database_path) } | ||
|
|
||
| @download_manager = DownloadManager.new(@metadata) | ||
|
|
||
| @trigger_download = Concurrent::AtomicBoolean.new(false) | ||
| end | ||
|
|
||
| protected | ||
| # update database path to the new download | ||
| # update timestamp when download is valid or there is no update | ||
| # do daily check and clean up | ||
| def execute_download_job | ||
| begin | ||
| has_update, new_database_path = @download_manager.fetch_database | ||
| @database_path = new_database_path if has_update | ||
| @metadata.save_timestamp(@database_path) | ||
| has_update | ||
| updated_db = @download_manager.fetch_database | ||
| updated_db.each do |database_type, valid_download, dirname, new_database_path| | ||
| if valid_download | ||
| @metadata.save_metadata(database_type, dirname, true) | ||
| @states[database_type].is_eula = true | ||
| @states[database_type].database_path = new_database_path | ||
| @states[database_type].plugins.dup.each { |plugin| plugin.setup_filter(new_database_path) if plugin } | ||
| end | ||
| end | ||
|
|
||
| updated_type = updated_db.map { |database_type, valid_download, new_database_path| database_type } | ||
| (DB_TYPES - updated_type).each { |unchange_type| @metadata.update_timestamp(unchange_type) } | ||
| rescue => e | ||
| logger.error(e.message, :cause => e.cause, :backtrace => e.backtrace) | ||
| ensure | ||
| check_age | ||
| false | ||
| clean_up_database | ||
| end | ||
| end | ||
|
|
||
| # call expiry action if database is expired and EULA | ||
| def check_age(database_types = DB_TYPES) | ||
| database_types.map do |database_type| | ||
| days_without_update = (::Date.today - ::Time.at(@metadata.updated_at(database_type)).to_date).to_i | ||
|
|
||
| case | ||
| when days_without_update >= 30 | ||
| if @states[database_type].is_eula && @states[database_type].plugins.size > 0 | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (5) this check ensures Logstash can use CC database indefinitely. we only notify plugins to do expiry action when the plugin is using EULA and age >= 30 |
||
| logger.error("The MaxMind database hasn't been updated from last 30 days. Logstash is unable to get newer version from internet. "\ | ||
| "According to EULA, GeoIP plugin needs to stop using MaxMind database in order to be compliant. "\ | ||
| "Please check the network settings and allow Logstash accesses the internet to download the latest database, "\ | ||
| "or switch to offline mode (:database => PATH_TO_YOUR_DATABASE) to use a self-managed database "\ | ||
| "which you can download from https://dev.maxmind.com/geoip/geoip2/geolite2/ ") | ||
| @states[database_type].plugins.dup.each { |plugin| plugin.expire_action if plugin } | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (4) the actual expiry logic is in plugin |
||
| end | ||
| when days_without_update >= 25 | ||
| if @states[database_type].is_eula && @states[database_type].plugins.size > 0 | ||
| logger.warn("The MaxMind database hasn't been updated for last #{days_without_update} days. "\ | ||
| "Logstash will fail the GeoIP plugin in #{30 - days_without_update} days. "\ | ||
| "Please check the network settings and allow Logstash accesses the internet to download the latest database ") | ||
| end | ||
| else | ||
| logger.trace("The endpoint hasn't updated", :days_without_update => days_without_update) | ||
| end | ||
| end | ||
| end | ||
|
jsvd marked this conversation as resolved.
|
||
|
|
||
| # Clean up files .mmdb, .tgz which are not mentioned in metadata and not default database | ||
| def clean_up_database | ||
| protected_dirnames = (@metadata.dirnames + [CC]).uniq | ||
| existing_dirnames = ::Dir.children(get_data_dir_path) | ||
| .select { |f| ::File.directory? ::File.join(get_data_dir_path, f) } | ||
|
|
||
| (existing_dirnames - protected_dirnames).each do |dirname| | ||
| dir_path = get_dir_path(dirname) | ||
| FileUtils.rm_r(dir_path) | ||
| logger.debug("#{dir_path} is deleted") | ||
|
jsvd marked this conversation as resolved.
Outdated
|
||
| end | ||
| end | ||
|
|
||
| def trigger_download | ||
|
jsvd marked this conversation as resolved.
|
||
| if @trigger_download.false? && @trigger_download.make_true | ||
| execute_download_job | ||
|
|
||
| # check database update periodically. trigger `call` method | ||
| @scheduler = Rufus::Scheduler.new({:max_work_threads => 1}) | ||
| @scheduler.every('24h', self) | ||
| end | ||
| end | ||
|
|
||
| public | ||
|
|
||
| # scheduler callback | ||
| def call(job, time) | ||
| logger.debug "scheduler runs database update check" | ||
| execute_download_job | ||
| end | ||
|
|
||
| begin | ||
| if execute_download_job | ||
| @geoip.setup_filter(database_path) | ||
| clean_up_database | ||
| end | ||
| rescue DatabaseExpiryError => e | ||
| logger.error(e.message, :cause => e.cause, :backtrace => e.backtrace) | ||
| @geoip.terminate_filter | ||
| end | ||
| def database_path(database_type) | ||
| @states[database_type].database_path | ||
| end | ||
|
|
||
| def close | ||
| @scheduler.every_jobs.each(&:unschedule) if @scheduler | ||
|
jsvd marked this conversation as resolved.
Outdated
|
||
| end | ||
|
|
||
| def database_path | ||
| @database_path | ||
| end | ||
| def subscribe_database_path(database_type, database_path, geoip_plugin) | ||
| if database_path.nil? | ||
| trigger_download | ||
|
jsvd marked this conversation as resolved.
|
||
|
|
||
| protected | ||
| # return a valid database path or default database path | ||
| def patch_database_path(database_path) | ||
| return database_path if file_exist?(database_path) | ||
| return database_path if database_path = get_file_path("#{DB_PREFIX}#{@database_type}.#{DB_EXT}") and file_exist?(database_path) | ||
| raise "You must specify 'database => ...' in your geoip filter (I looked for '#{database_path}')" | ||
| end | ||
| logger.info "By using `online` mode, you accepted and agreed MaxMind EULA. "\ | ||
| "For more details please visit https://www.maxmind.com/en/geolite2/eula" if @states[database_type].is_eula | ||
|
|
||
| def check_age | ||
| days_without_update = (::Date.today - ::Time.at(@metadata.updated_at).to_date).to_i | ||
|
|
||
| case | ||
| when days_without_update >= 30 | ||
| raise DatabaseExpiryError, "The MaxMind database has been used for more than 30 days. Logstash is unable to get newer version from internet. "\ | ||
| "According to EULA, GeoIP plugin needs to stop in order to be compliant. "\ | ||
| "Please check the network settings and allow Logstash accesses the internet to download the latest database, "\ | ||
| "or switch to offline mode (:database => PATH_TO_YOUR_DATABASE) to use a self-managed database which you can download from https://dev.maxmind.com/geoip/geoip2/geolite2/ " | ||
| when days_without_update >= 25 | ||
| logger.warn("The MaxMind database has been used for #{days_without_update} days without update. "\ | ||
| "Logstash will stop the GeoIP plugin in #{30 - days_without_update} days. "\ | ||
| "Please check the network settings and allow Logstash accesses the internet to download the latest database ") | ||
| @states[database_type].plugins.push(geoip_plugin) unless @states[database_type].plugins.member?(geoip_plugin) | ||
| @states[database_type].database_path | ||
| else | ||
| logger.debug("The MaxMind database hasn't updated", :days_without_update => days_without_update) | ||
| logger.info "GeoIP plugin is in offline mode. Logstash points to static database files and will not check for update. "\ | ||
| "Keep in mind that if you are not using the database shipped with this plugin, "\ | ||
| "please go to https://www.maxmind.com/en/geolite2/eula to accept and agree the terms and conditions." | ||
| database_path | ||
| end | ||
| end | ||
|
|
||
| # Clean up files .mmdb, .tgz which are not mentioned in metadata and not default database | ||
| def clean_up_database | ||
| if @metadata.exist? | ||
| protected_filenames = (@metadata.database_filenames + DEFAULT_DATABASE_FILENAME).uniq | ||
| existing_filenames = ::Dir.glob(get_file_path("*.{#{DB_EXT},#{GZ_EXT}}")) | ||
| .map { |path| ::File.basename(path) } | ||
|
|
||
| (existing_filenames - protected_filenames).each do |filename| | ||
| ::File.delete(get_file_path(filename)) | ||
| logger.debug("old database #{filename} is deleted") | ||
| end | ||
| end | ||
| def unsubscribe_database_path(database_type, geoip_plugin) | ||
| @states[database_type].plugins.delete(geoip_plugin) if geoip_plugin | ||
| end | ||
|
|
||
| def setup | ||
| @metadata = DatabaseMetadata.new(@database_type) | ||
| @metadata.save_timestamp(@database_path) unless @metadata.exist? | ||
|
|
||
| @database_path = @metadata.database_path || @database_path | ||
|
|
||
| @download_manager = DownloadManager.new(@database_type, @metadata) | ||
| # create data dir, path.data, for geoip if it doesn't exist | ||
| # copy CC databases to data dir | ||
| def self.prepare_cc_db | ||
| FileUtils::mkdir_p(get_data_dir_path) | ||
| unless ::File.exist?(get_db_path(CITY, CC)) && ::File.exist?(get_db_path(ASN, CC)) | ||
| cc_database_paths = ::Dir.glob(::File.join(LogStash::Environment::LOGSTASH_HOME, "vendor", "**", "{GeoLite2-ASN,GeoLite2-City}.mmdb")) | ||
| cc_dir_path = get_dir_path(CC) | ||
| FileUtils.mkdir_p(cc_dir_path) | ||
| FileUtils.cp_r(cc_database_paths, cc_dir_path) | ||
| end | ||
|
jsvd marked this conversation as resolved.
Outdated
|
||
| end | ||
|
|
||
| class DatabaseExpiryError < StandardError | ||
| class DatabaseState | ||
| attr_reader :is_eula, :plugins, :database_path, :cc_database_path | ||
| attr_writer :is_eula, :database_path | ||
|
|
||
| # @param is_eula [Boolean] | ||
| # @param plugins [Concurrent::Array] | ||
| # @param database_path [String] | ||
| # @param cc_database_path [String] | ||
| def initialize(is_eula, plugins, database_path, cc_database_path) | ||
| @is_eula = is_eula | ||
| @plugins = plugins | ||
| @database_path = database_path | ||
| @cc_database_path = cc_database_path | ||
| end | ||
| end | ||
| end end end end | ||
Uh oh!
There was an error while loading. Please reload this page.