Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5b83225
change DatabaseManager to singleton to ensure only one instance downl…
kaisecheng Apr 30, 2021
94ee10c
DatabaseManager ensures single download job
kaisecheng May 3, 2021
222d807
refactor DatabaseManager
kaisecheng May 4, 2021
8646194
log age check only when subscriber exist
kaisecheng May 4, 2021
1d2ab08
update log message
kaisecheng May 5, 2021
950fe8c
add bypass
kaisecheng May 5, 2021
481bb22
change metadata format
kaisecheng May 5, 2021
18a9cac
log trace
kaisecheng May 5, 2021
4d8be13
comment
kaisecheng May 5, 2021
0dd4c9d
add test case
kaisecheng May 6, 2021
f4cf2a5
fix test
kaisecheng May 6, 2021
580824b
Merge branch 'master' of github.com:elastic/logstash into geoip_multi…
kaisecheng May 10, 2021
068f9da
Merge branch 'master' of github.com:elastic/logstash into geoip_multi…
kaisecheng May 11, 2021
021d48d
update log message
kaisecheng May 11, 2021
c885b82
ensure database path return the result after initial download finished
kaisecheng Jun 2, 2021
fe26d4b
Merge branch 'master' of github.com:elastic/logstash into geoip_multi…
kaisecheng Jun 2, 2021
0f94420
move metadata preparation
kaisecheng Jun 3, 2021
439e292
adopt observable pattern
kaisecheng Jun 3, 2021
02e094e
remove plugins reference in state
kaisecheng Jun 4, 2021
7127201
change DatabaseManager to Singleton
kaisecheng Jun 7, 2021
f360013
change the metadata column name from `update_at` to `check_at`
kaisecheng Jun 7, 2021
0b0c4fb
pin geoip to 7.2
kaisecheng Jun 8, 2021
46c3ac6
fix pin gem version
kaisecheng Jun 8, 2021
c9f2fe7
minor syntax update
kaisecheng Jun 8, 2021
195083f
remove dead code
kaisecheng Jun 9, 2021
5c2c862
update test
kaisecheng Jun 9, 2021
fb2b592
remove observable
kaisecheng Jun 10, 2021
ba9bbd9
add test case
kaisecheng Jun 10, 2021
73b7a4f
remove thread context `pipeline.id`
kaisecheng Jun 14, 2021
59880b6
adjust log message
kaisecheng Jun 14, 2021
cda5900
adjust log message
kaisecheng Jun 15, 2021
79c15de
fix test
kaisecheng Jun 15, 2021
b9b248c
adjust log message
kaisecheng Jun 15, 2021
bb22ab8
Merge branch 'master' of github.com:elastic/logstash into geoip_multi…
kaisecheng Jun 16, 2021
dfca338
`7.2.0` was yanked due to missing jars dependency in manual publish. …
kaisecheng Jun 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion logstash-core/logstash-core.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "manticore", '~> 0.6'

# xpack geoip database service
gem.add_development_dependency 'logstash-filter-geoip', '~> 7.1' # package hierarchy change
gem.add_development_dependency 'logstash-filter-geoip', '>= 7.2.1' # breaking change of DatabaseManager
gem.add_dependency 'faraday' #(MIT license)
gem.add_dependency 'down', '~> 5.2.0' #(MIT license)
gem.add_dependency 'tzinfo-data' #(MIT license)
Expand Down
276 changes: 175 additions & 101 deletions x-pack/lib/filters/geoip/database_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
require "down"
require "rufus/scheduler"
require "date"
require "singleton"
require "concurrent"
require "thread"
java_import org.apache.logging.log4j.ThreadContext

# The mission of DatabaseManager is to ensure the plugin running an up-to-date MaxMind database and
# thus users are compliant with EULA.
Expand All @@ -21,148 +25,218 @@
# to keep track of versions and the number of days disconnects to the endpoint.
# Once a new database version release, DownloadManager downloads it, and GeoIP Filter uses it on-the-fly.
# If the last update timestamp is 25 days ago, a warning message shows in the log;
# if it was 30 days ago, the GeoIP Filter should shutdown in order to be compliant.
# if it was 30 days ago, the GeoIP Filter should stop using EULA database in order to be compliant.
# There are online mode and offline mode in DatabaseManager. `online` is for automatic database update
# while `offline` is for static database path provided by users

module LogStash module Filters module Geoip class DatabaseManager
extend LogStash::Filters::Geoip::Util
include LogStash::Util::Loggable
include LogStash::Filters::Geoip::Util

#TODO remove vendor_path
def initialize(geoip, database_path, database_type, vendor_path)
@geoip = geoip
self.class.prepare_cc_db
@mode = database_path.nil? ? :online : :offline
@mode = :disabled # This is a temporary change that turns off the database manager until it is ready for general availability.
@database_type = database_type
@database_path = patch_database_path(database_path)

if @mode == :online
logger.info "By not manually configuring a database path with `database =>`, you accepted and agreed MaxMind EULA. "\
"For more details please visit https://www.maxmind.com/en/geolite2/eula"

setup
clean_up_database
execute_download_job

# check database update periodically. trigger `call` method
@scheduler = Rufus::Scheduler.new({:max_work_threads => 1})
@scheduler.every('24h', self)
elsif @mode == :disabled
# The plan is to use CC database in Logstash 7.x and enable EULA database in 8
else
logger.info "GeoIP database path is configured manually so the plugin will not check for update. "\
"Keep in mind that if you are not using the database shipped with this plugin, "\
"please go to https://www.maxmind.com/en/geolite2/eula and understand the terms and conditions."
end
include Singleton

private
def initialize
prepare_cc_db
cc_city_database_path = get_db_path(CITY, CC)
cc_asn_database_path = get_db_path(ASN, CC)

prepare_metadata
city_database_path = @metadata.database_path(CITY)
asn_database_path = @metadata.database_path(ASN)

@triggered = false
@trigger_lock = Mutex.new
@states = { "#{CITY}" => DatabaseState.new(@metadata.is_eula(CITY),
Concurrent::Array.new,
city_database_path,
cc_city_database_path),
"#{ASN}" => DatabaseState.new(@metadata.is_eula(ASN),
Concurrent::Array.new,
asn_database_path,
cc_asn_database_path) }

@download_manager = DownloadManager.new(@metadata)
end

DEFAULT_DATABASE_FILENAME = %w{
GeoLite2-City.mmdb
GeoLite2-ASN.mmdb
}.map(&:freeze).freeze

public

protected
# create data dir, path.data, for geoip if it doesn't exist
# copy CC databases to data dir
def self.prepare_cc_db
FileUtils::mkdir_p(get_data_dir)
unless ::File.exist?(get_file_path(CITY_DB_NAME)) && ::File.exist?(get_file_path(ASN_DB_NAME))
def prepare_cc_db
FileUtils::mkdir_p(get_data_dir_path)
unless ::File.exist?(get_db_path(CITY, CC)) && ::File.exist?(get_db_path(ASN, CC))
cc_database_paths = ::Dir.glob(::File.join(LogStash::Environment::LOGSTASH_HOME, "vendor", "**", "{GeoLite2-ASN,GeoLite2-City}.mmdb"))
FileUtils.cp_r(cc_database_paths, get_data_dir)
cc_dir_path = get_dir_path(CC)
FileUtils.mkdir_p(cc_dir_path)
FileUtils.cp_r(cc_database_paths, cc_dir_path)
end
end

def prepare_metadata
@metadata = DatabaseMetadata.new

unless @metadata.exist?
@metadata.save_metadata(CITY, CC, false)
@metadata.save_metadata(ASN, CC, false)
end

# reset md5 to allow re-download when the database directory is deleted manually
DB_TYPES.each { |type| @metadata.reset_md5(type) if @metadata.database_path(type).nil? }

@metadata
end

# notice plugins to use the new database path
# update metadata timestamp for those dbs that has no update or a valid update
# do daily check and clean up
def execute_download_job
begin
has_update, new_database_path = @download_manager.fetch_database
@database_path = new_database_path if has_update
@metadata.save_timestamp(@database_path)
has_update
pipeline_id = ThreadContext.get("pipeline.id")
ThreadContext.put("pipeline.id", nil)

updated_db = @download_manager.fetch_database
updated_db.each do |database_type, valid_download, dirname, new_database_path|
if valid_download
@metadata.save_metadata(database_type, dirname, true)
@states[database_type].is_eula = true
@states[database_type].is_expired = false
@states[database_type].database_path = new_database_path

notify_plugins(database_type, :update, new_database_path) do |db_type, ids|
logger.info("geoip plugin will use database #{new_database_path}",
:database_type => db_type, :pipeline_ids => ids) unless ids.empty?
end
end
end

updated_types = updated_db.map { |database_type, valid_download, dirname, new_database_path| database_type }
(DB_TYPES - updated_types).each { |unchange_type| @metadata.update_timestamp(unchange_type) }
rescue => e
logger.error(e.message, error_details(e, logger))
ensure
check_age
false
clean_up_database
ThreadContext.put("pipeline.id", pipeline_id)
end
end

# scheduler callback
def call(job, time)
logger.debug "scheduler runs database update check"
def notify_plugins(database_type, action, *args)
plugins = @states[database_type].plugins.dup
ids = plugins.map { |plugin| plugin.execution_context.pipeline_id }.sort
yield database_type, ids
plugins.each { |plugin| plugin.update_filter(action, *args) if plugin }
end

begin
if execute_download_job
@geoip.setup_filter(database_path)
clean_up_database
# call expiry action if Logstash use EULA database and fail to touch the endpoint for 30 days in a row
def check_age(database_types = DB_TYPES)
database_types.map do |database_type|
next unless @states[database_type].is_eula

days_without_update = (::Date.today - ::Time.at(@metadata.check_at(database_type)).to_date).to_i

case
when days_without_update >= 30
was_expired = @states[database_type].is_expired
@states[database_type].is_expired = true
@states[database_type].database_path = nil

notify_plugins(database_type, :expire) do |db_type, ids|
unless was_expired
logger.error("The MaxMind database hasn't been updated from last 30 days. Logstash is unable to get newer version from internet. "\
"According to EULA, GeoIP plugin needs to stop using MaxMind database in order to be compliant. "\
"Please check the network settings and allow Logstash accesses the internet to download the latest database, "\
"or switch to offline mode (:database => PATH_TO_YOUR_DATABASE) to use a self-managed database "\
"which you can download from https://dev.maxmind.com/geoip/geoip2/geolite2/ ")

logger.warn("geoip plugin will stop filtering and will tag all events with the '_geoip_expired_database' tag.",
:database_type => db_type, :pipeline_ids => ids)
end
end
when days_without_update >= 25
logger.warn("The MaxMind database hasn't been updated for last #{days_without_update} days. "\
"Logstash will fail the GeoIP plugin in #{30 - days_without_update} days. "\
"Please check the network settings and allow Logstash accesses the internet to download the latest database ")
else
logger.trace("passed age check", :days_without_update => days_without_update)
end
rescue DatabaseExpiryError => e
logger.error(e.message, error_details(e, logger))
@geoip.expire_action
end
end

def close
@scheduler.every_jobs.each(&:unschedule) if @scheduler
# Clean up directories which are not mentioned in metadata and not CC database
def clean_up_database
protected_dirnames = (@metadata.dirnames + [CC]).uniq
existing_dirnames = ::Dir.children(get_data_dir_path)
.select { |f| ::File.directory? ::File.join(get_data_dir_path, f) }

(existing_dirnames - protected_dirnames).each do |dirname|
dir_path = get_dir_path(dirname)
FileUtils.rm_r(dir_path)
logger.info("#{dir_path} is deleted")
end
end

def database_path
@database_path
def trigger_download
return if @triggered
@trigger_lock.synchronize do
return if @triggered
execute_download_job
# check database update periodically. trigger `call` method
@scheduler = Rufus::Scheduler.new({:max_work_threads => 1})
@scheduler.every('24h', self)
@triggered = true
end
end

protected
# return a valid database path or default database path
def patch_database_path(database_path)
return database_path if file_exist?(database_path)
return database_path if database_path = get_file_path("#{DB_PREFIX}#{@database_type}.#{DB_EXT}") and file_exist?(database_path)
raise "You must specify 'database => ...' in your geoip filter (I looked for '#{database_path}')"
end
public

def check_age
return if @metadata.cc?

days_without_update = (::Date.today - ::Time.at(@metadata.updated_at).to_date).to_i

case
when days_without_update >= 30
raise DatabaseExpiryError, "The MaxMind database has been used for more than 30 days. Logstash is unable to get newer version from internet. "\
"According to EULA, GeoIP plugin needs to stop in order to be compliant. "\
"Please check the network settings and allow Logstash accesses the internet to download the latest database, "\
"or configure a database manually (:database => PATH_TO_YOUR_DATABASE) to use a self-managed database which you can download from https://dev.maxmind.com/geoip/geoip2/geolite2/ "
when days_without_update >= 25
logger.warn("The MaxMind database has been used for #{days_without_update} days without update. "\
"Logstash will fail the GeoIP plugin in #{30 - days_without_update} days. "\
"Please check the network settings and allow Logstash accesses the internet to download the latest database ")
else
logger.trace("The MaxMind database hasn't updated", :days_without_update => days_without_update)
end
# scheduler callback
def call(job, time)
logger.debug "scheduler runs database update check"
ThreadContext.put("pipeline.id", nil)
execute_download_job
end

# Clean up files .mmdb, .tgz which are not mentioned in metadata and not default database
def clean_up_database
if @metadata.exist?
protected_filenames = (@metadata.database_filenames + DEFAULT_DATABASE_FILENAME).uniq
existing_filenames = ::Dir.glob(get_file_path("*.{#{DB_EXT},#{GZ_EXT}}"))
.map { |path| ::File.basename(path) }

(existing_filenames - protected_filenames).each do |filename|
::File.delete(get_file_path(filename))
logger.debug("old database #{filename} is deleted")
def subscribe_database_path(database_type, database_path, geoip_plugin)
if database_path.nil?
trigger_download

logger.info "By not manually configuring a database path with `database =>`, you accepted and agreed MaxMind EULA. "\
"For more details please visit https://www.maxmind.com/en/geolite2/eula" if @states[database_type].is_eula

@states[database_type].plugins.push(geoip_plugin) unless @states[database_type].plugins.member?(geoip_plugin)
@trigger_lock.synchronize do
@states[database_type].database_path
end
else
logger.info "GeoIP database path is configured manually so the plugin will not check for update. "\
"Keep in mind that if you are not using the database shipped with this plugin, "\
"please go to https://www.maxmind.com/en/geolite2/eula and understand the terms and conditions."
database_path
end
end

def setup
@metadata = DatabaseMetadata.new(@database_type)
@metadata.save_timestamp(@database_path) unless @metadata.exist?

@database_path = @metadata.database_path || @database_path
def unsubscribe_database_path(database_type, geoip_plugin)
@states[database_type].plugins.delete(geoip_plugin) if geoip_plugin
end

@download_manager = DownloadManager.new(@database_type, @metadata)
def database_path(database_type)
@states[database_type].database_path
end

class DatabaseExpiryError < StandardError
class DatabaseState
attr_reader :is_eula, :plugins, :database_path, :cc_database_path, :is_expired
attr_writer :is_eula, :database_path, :is_expired

# @param is_eula [Boolean]
# @param plugins [Concurrent::Array]
# @param database_path [String]
# @param cc_database_path [String]
def initialize(is_eula, plugins, database_path, cc_database_path)
@is_eula = is_eula
@plugins = plugins
@database_path = database_path
@cc_database_path = cc_database_path
@is_expired = false
end
end
end end end end
Loading