Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@instance_reload_metric = metric.namespace([:stats, :reloads])
initialize_agent_metrics

initialize_geoip_database_metrics(metric)

@dispatcher = LogStash::EventDispatcher.new(self)
LogStash::PLUGIN_REGISTRY.hooks.register_emitter(self.class, dispatcher)
dispatcher.fire(:after_initialize)
Expand Down Expand Up @@ -562,4 +564,28 @@ def update_successful_reload_metrics(action, action_result)
n.gauge(:last_success_timestamp, action_result.executed_at)
end
end

def initialize_geoip_database_metrics(metric)
begin
require_relative ::File.join(LogStash::Environment::LOGSTASH_HOME, "x-pack", "lib", "filters", "geoip", "database_manager")
database_manager = LogStash::Filters::Geoip::DatabaseManager.instance
database_manager.metric = metric.namespace([:geoip_download_manager]).tap do |n|
db = n.namespace([:database])
[:ASN, :City].each do |database_type|
db_type = db.namespace([database_type])
db_type.gauge(:status, nil)
db_type.gauge(:last_updated_at, nil)
db_type.gauge(:fail_check_in_days, 0)
end

dl = n.namespace([:download_stats])
dl.increment(:successes, 0)
dl.increment(:failures, 0)
dl.gauge(:last_checked_at, nil)
dl.gauge(:status, nil)
end
rescue LoadError => e
@logger.trace("DatabaseManager is not in classpath")
end
end
end # class LogStash::Agent
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ def hot_threads(options={})
HotThreadsReport.new(self, options)
end

def geoip
service.get_shallow(:geoip_download_manager)
rescue
{}
end

private
def plugins_stats_report(pipeline_id, extended_pipeline, opts={})
stats = service.get_shallow(:stats, :pipelines, pipeline_id.to_sym)
Expand Down
9 changes: 9 additions & 0 deletions logstash-core/lib/logstash/api/modules/node_stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class NodeStats < ::LogStash::Api::Modules::Base
:os => os_payload,
:queue => queue
}

geoip = geoip_payload
payload[:geoip_download_manager] = geoip unless geoip.empty? || geoip[:download_stats][:status].value.nil?

respond_with(payload, {:filter => params["filter"]})
end

Expand Down Expand Up @@ -77,6 +81,11 @@ def pipeline_payload(val = nil)
opts = {:vertices => as_boolean(params.fetch("vertices", false))}
@stats.pipeline(val, opts)
end

def geoip_payload
@stats.geoip
end

end
end
end
Expand Down
1 change: 1 addition & 0 deletions rakelib/plugins-metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
},
"logstash-filter-json": {
"default-plugins": true,
"core-specs": true,
"skip-list": false
},
"logstash-filter-kv": {
Expand Down
87 changes: 83 additions & 4 deletions x-pack/lib/filters/geoip/database_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require "date"
require "singleton"
require "concurrent"
require "time"
require "thread"
java_import org.apache.logging.log4j.ThreadContext

Expand All @@ -37,6 +38,11 @@ module LogStash module Filters module Geoip class DatabaseManager

private
def initialize
@triggered = false
@trigger_lock = Mutex.new
end

def setup
prepare_cc_db
cc_city_database_path = get_db_path(CITY, CC)
cc_asn_database_path = get_db_path(ASN, CC)
Expand All @@ -45,8 +51,6 @@ def initialize
city_database_path = @metadata.database_path(CITY)
asn_database_path = @metadata.database_path(ASN)

@triggered = false
@trigger_lock = Mutex.new
@states = { "#{CITY}" => DatabaseState.new(@metadata.is_eula(CITY),
Concurrent::Array.new,
city_database_path,
Expand All @@ -57,6 +61,8 @@ def initialize
cc_asn_database_path) }

@download_manager = DownloadManager.new(@metadata)

initialize_metrics
end

protected
Expand Down Expand Up @@ -90,10 +96,14 @@ def prepare_metadata
# update metadata timestamp for those dbs that has no update or a valid update
# do daily check and clean up
def execute_download_job
success_cnt = 0

begin
pipeline_id = ThreadContext.get("pipeline.id")
ThreadContext.put("pipeline.id", nil)

update_download_status(:updating)

updated_db = @download_manager.fetch_database
updated_db.each do |database_type, valid_download, dirname, new_database_path|
if valid_download
Expand All @@ -106,16 +116,23 @@ def execute_download_job
logger.info("geoip plugin will use database #{new_database_path}",
:database_type => db_type, :pipeline_ids => ids) unless ids.empty?
end

success_cnt += 1
end
end

updated_types = updated_db.map { |database_type, valid_download, dirname, new_database_path| database_type }
(DB_TYPES - updated_types).each { |unchange_type| @metadata.update_timestamp(unchange_type) }
(DB_TYPES - updated_types).each do |unchange_type|
@metadata.update_timestamp(unchange_type)
success_cnt += 1
end
rescue => e
logger.error(e.message, error_details(e, logger))
ensure
check_age
clean_up_database
update_download_metric(success_cnt)

ThreadContext.put("pipeline.id", pipeline_id)
end
end
Expand All @@ -132,7 +149,9 @@ def check_age(database_types = DB_TYPES)
database_types.map do |database_type|
next unless @states[database_type].is_eula

days_without_update = (::Date.today - ::Time.at(@metadata.check_at(database_type)).to_date).to_i
metadata = @metadata.get_metadata(database_type).last
check_at = metadata[DatabaseMetadata::Column::CHECK_AT].to_i
days_without_update = time_diff_in_days(check_at)

case
when days_without_update >= 30
Expand All @@ -152,16 +171,34 @@ def check_age(database_types = DB_TYPES)
:database_type => db_type, :pipeline_ids => ids)
end
end

database_status = :expired
when days_without_update >= 25
logger.warn("The MaxMind database hasn't been updated for last #{days_without_update} days. "\
"Logstash will fail the GeoIP plugin in #{30 - days_without_update} days. "\
"Please check the network settings and allow Logstash accesses the internet to download the latest database ")
database_status = :to_be_expired
else
logger.trace("passed age check", :days_without_update => days_without_update)
database_status = :up_to_date
end

@metric.namespace([:database, database_type.to_sym]).tap do |n|
n.gauge(:status, database_status)
n.gauge(:last_updated_at, unix_time_to_iso8601(metadata[DatabaseMetadata::Column::DIRNAME]))
n.gauge(:fail_check_in_days, days_without_update)
end
end
end

def time_diff_in_days(timestamp)
(::Date.today - ::Time.at(timestamp.to_i).to_date).to_i
end

def unix_time_to_iso8601(timestamp)
Time.at(timestamp.to_i).iso8601
end

# Clean up directories which are not mentioned in metadata and not CC database
def clean_up_database
protected_dirnames = (@metadata.dirnames + [CC]).uniq
Expand All @@ -179,6 +216,7 @@ def trigger_download
return if @triggered
@trigger_lock.synchronize do
return if @triggered
setup
execute_download_job
# check database update periodically. trigger `call` method
@scheduler = Rufus::Scheduler.new({:max_work_threads => 1})
Expand All @@ -187,6 +225,43 @@ def trigger_download
end
end

def initialize_metrics
metadatas = @metadata.get_all
metadatas.each do |row|
type = row[DatabaseMetadata::Column::DATABASE_TYPE]
@metric.namespace([:database, type.to_sym]).tap do |n|
n.gauge(:status, @states[type].is_eula ? :up_to_date : :init)
if @states[type].is_eula
n.gauge(:last_updated_at, unix_time_to_iso8601(row[DatabaseMetadata::Column::DIRNAME]))
n.gauge(:fail_check_in_days, time_diff_in_days(row[DatabaseMetadata::Column::CHECK_AT]))
end
end
end

@metric.namespace([:download_stats]).tap do |n|
check_at = metadatas.map { |row| row[DatabaseMetadata::Column::CHECK_AT].to_i }.max
n.gauge(:last_checked_at, unix_time_to_iso8601(check_at))
end
end

def update_download_metric(success_cnt)
@metric.namespace([:download_stats]).tap do |n|
n.gauge(:last_checked_at, Time.now.iso8601)

if success_cnt == DB_TYPES.size
n.increment(:successes, 1)
n.gauge(:status, :succeeded)
else
n.increment(:failures, 1)
n.gauge(:status, :failed)
end
end
end

def update_download_status(status)
@metric.namespace([:download_stats]).gauge(:status, status)
end

public

# scheduler callback
Expand Down Expand Up @@ -223,6 +298,10 @@ def database_path(database_type)
@states[database_type].database_path
end

def metric=(metric)
@metric = metric
end

class DatabaseState
attr_reader :is_eula, :plugins, :database_path, :cc_database_path, :is_expired
attr_writer :is_eula, :database_path, :is_expired
Expand Down
48 changes: 48 additions & 0 deletions x-pack/qa/integration/monitoring/geoip_metric_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.

require_relative "../spec_helper"
require_relative "../../../../qa/integration/services/monitoring_api"

describe "GeoIP database service" do
let(:input) { "input { generator { lines => ['{\\\"host\\\": \\\"0.42.56.104\\\"}'] } } " }
let(:output) { "output { null {} }" }
let(:filter) { " " }
let(:config) { input + filter + output }

context "monitoring API with geoip plugin" do
let(:filter) { "filter { json { source => \\\"message\\\" } geoip { source => \\\"host\\\" } } " }

it "should have geoip" do
start_logstash
api = MonitoringAPI.new
stats = api.node_stats

expect(stats["geoip_download_manager"]).not_to be_nil
end
end

context "monitoring API without geoip plugin" do
it "should not have geoip" do
start_logstash
api = MonitoringAPI.new
stats = api.node_stats

expect(stats["geoip_download_manager"]).to be_nil
end
end

def start_logstash
@logstash_service = logstash("bin/logstash -e \"#{config}\" -w 1", {
:belzebuth => {
:wait_condition => /Pipelines running/, # Check for all pipeline started
:timeout => 5 * 60 # Fail safe, this mean something went wrong if we hit this before the wait_condition
}
})
end

after(:each) do
@logstash_service.stop unless @logstash_service.nil?
end
end
Loading