diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index c90a4d6d66e..52889062d22 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -107,6 +107,8 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) @instance_reload_metric = metric.namespace([:stats, :reloads]) initialize_agent_metrics + initialize_geoip_database_metrics(metric) + @dispatcher = LogStash::EventDispatcher.new(self) LogStash::PLUGIN_REGISTRY.hooks.register_emitter(self.class, dispatcher) dispatcher.fire(:after_initialize) @@ -562,4 +564,28 @@ def update_successful_reload_metrics(action, action_result) n.gauge(:last_success_timestamp, action_result.executed_at) end end + + def initialize_geoip_database_metrics(metric) + begin + require_relative ::File.join(LogStash::Environment::LOGSTASH_HOME, "x-pack", "lib", "filters", "geoip", "database_manager") + database_manager = LogStash::Filters::Geoip::DatabaseManager.instance + database_manager.metric = metric.namespace([:geoip_download_manager]).tap do |n| + db = n.namespace([:database]) + [:ASN, :City].each do |database_type| + db_type = db.namespace([database_type]) + db_type.gauge(:status, nil) + db_type.gauge(:last_updated_at, nil) + db_type.gauge(:fail_check_in_days, 0) + end + + dl = n.namespace([:download_stats]) + dl.increment(:successes, 0) + dl.increment(:failures, 0) + dl.gauge(:last_checked_at, nil) + dl.gauge(:status, nil) + end + rescue LoadError => e + @logger.trace("DatabaseManager is not in classpath") + end + end end # class LogStash::Agent diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 2cd0bd5cd99..9138b955d80 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -132,6 +132,12 @@ def hot_threads(options={}) HotThreadsReport.new(self, options) end + def geoip + service.get_shallow(:geoip_download_manager) + rescue + {} + end + private def plugins_stats_report(pipeline_id, extended_pipeline, opts={}) stats = service.get_shallow(:stats, :pipelines, pipeline_id.to_sym) diff --git a/logstash-core/lib/logstash/api/modules/node_stats.rb b/logstash-core/lib/logstash/api/modules/node_stats.rb index f8f8c1120e8..bbb2c0df03d 100644 --- a/logstash-core/lib/logstash/api/modules/node_stats.rb +++ b/logstash-core/lib/logstash/api/modules/node_stats.rb @@ -40,6 +40,10 @@ class NodeStats < ::LogStash::Api::Modules::Base :os => os_payload, :queue => queue } + + geoip = geoip_payload + payload[:geoip_download_manager] = geoip unless geoip.empty? || geoip[:download_stats][:status].value.nil? + respond_with(payload, {:filter => params["filter"]}) end @@ -77,6 +81,11 @@ def pipeline_payload(val = nil) opts = {:vertices => as_boolean(params.fetch("vertices", false))} @stats.pipeline(val, opts) end + + def geoip_payload + @stats.geoip + end + end end end diff --git a/rakelib/plugins-metadata.json b/rakelib/plugins-metadata.json index 269a2eca450..e2e4dea0564 100644 --- a/rakelib/plugins-metadata.json +++ b/rakelib/plugins-metadata.json @@ -158,6 +158,7 @@ }, "logstash-filter-json": { "default-plugins": true, + "core-specs": true, "skip-list": false }, "logstash-filter-kv": { diff --git a/x-pack/lib/filters/geoip/database_manager.rb b/x-pack/lib/filters/geoip/database_manager.rb index 71addb09be1..94e49365cae 100644 --- a/x-pack/lib/filters/geoip/database_manager.rb +++ b/x-pack/lib/filters/geoip/database_manager.rb @@ -15,6 +15,7 @@ require "date" require "singleton" require "concurrent" +require "time" require "thread" java_import org.apache.logging.log4j.ThreadContext @@ -37,6 +38,11 @@ module LogStash module Filters module Geoip class DatabaseManager private def initialize + @triggered = false + @trigger_lock = Mutex.new + end + + def setup prepare_cc_db cc_city_database_path = get_db_path(CITY, CC) cc_asn_database_path = get_db_path(ASN, CC) @@ -45,8 +51,6 @@ def initialize city_database_path = @metadata.database_path(CITY) asn_database_path = @metadata.database_path(ASN) - @triggered = false - @trigger_lock = Mutex.new @states = { "#{CITY}" => DatabaseState.new(@metadata.is_eula(CITY), Concurrent::Array.new, city_database_path, @@ -57,6 +61,8 @@ def initialize cc_asn_database_path) } @download_manager = DownloadManager.new(@metadata) + + initialize_metrics end protected @@ -90,10 +96,14 @@ def prepare_metadata # update metadata timestamp for those dbs that has no update or a valid update # do daily check and clean up def execute_download_job + success_cnt = 0 + begin pipeline_id = ThreadContext.get("pipeline.id") ThreadContext.put("pipeline.id", nil) + update_download_status(:updating) + updated_db = @download_manager.fetch_database updated_db.each do |database_type, valid_download, dirname, new_database_path| if valid_download @@ -106,16 +116,23 @@ def execute_download_job logger.info("geoip plugin will use database #{new_database_path}", :database_type => db_type, :pipeline_ids => ids) unless ids.empty? end + + success_cnt += 1 end end updated_types = updated_db.map { |database_type, valid_download, dirname, new_database_path| database_type } - (DB_TYPES - updated_types).each { |unchange_type| @metadata.update_timestamp(unchange_type) } + (DB_TYPES - updated_types).each do |unchange_type| + @metadata.update_timestamp(unchange_type) + success_cnt += 1 + end rescue => e logger.error(e.message, error_details(e, logger)) ensure check_age clean_up_database + update_download_metric(success_cnt) + ThreadContext.put("pipeline.id", pipeline_id) end end @@ -132,7 +149,9 @@ def check_age(database_types = DB_TYPES) database_types.map do |database_type| next unless @states[database_type].is_eula - days_without_update = (::Date.today - ::Time.at(@metadata.check_at(database_type)).to_date).to_i + metadata = @metadata.get_metadata(database_type).last + check_at = metadata[DatabaseMetadata::Column::CHECK_AT].to_i + days_without_update = time_diff_in_days(check_at) case when days_without_update >= 30 @@ -152,16 +171,34 @@ def check_age(database_types = DB_TYPES) :database_type => db_type, :pipeline_ids => ids) end end + + database_status = :expired when days_without_update >= 25 logger.warn("The MaxMind database hasn't been updated for last #{days_without_update} days. "\ "Logstash will fail the GeoIP plugin in #{30 - days_without_update} days. "\ "Please check the network settings and allow Logstash accesses the internet to download the latest database ") + database_status = :to_be_expired else logger.trace("passed age check", :days_without_update => days_without_update) + database_status = :up_to_date + end + + @metric.namespace([:database, database_type.to_sym]).tap do |n| + n.gauge(:status, database_status) + n.gauge(:last_updated_at, unix_time_to_iso8601(metadata[DatabaseMetadata::Column::DIRNAME])) + n.gauge(:fail_check_in_days, days_without_update) end end end + def time_diff_in_days(timestamp) + (::Date.today - ::Time.at(timestamp.to_i).to_date).to_i + end + + def unix_time_to_iso8601(timestamp) + Time.at(timestamp.to_i).iso8601 + end + # Clean up directories which are not mentioned in metadata and not CC database def clean_up_database protected_dirnames = (@metadata.dirnames + [CC]).uniq @@ -179,6 +216,7 @@ def trigger_download return if @triggered @trigger_lock.synchronize do return if @triggered + setup execute_download_job # check database update periodically. trigger `call` method @scheduler = Rufus::Scheduler.new({:max_work_threads => 1}) @@ -187,6 +225,43 @@ def trigger_download end end + def initialize_metrics + metadatas = @metadata.get_all + metadatas.each do |row| + type = row[DatabaseMetadata::Column::DATABASE_TYPE] + @metric.namespace([:database, type.to_sym]).tap do |n| + n.gauge(:status, @states[type].is_eula ? :up_to_date : :init) + if @states[type].is_eula + n.gauge(:last_updated_at, unix_time_to_iso8601(row[DatabaseMetadata::Column::DIRNAME])) + n.gauge(:fail_check_in_days, time_diff_in_days(row[DatabaseMetadata::Column::CHECK_AT])) + end + end + end + + @metric.namespace([:download_stats]).tap do |n| + check_at = metadatas.map { |row| row[DatabaseMetadata::Column::CHECK_AT].to_i }.max + n.gauge(:last_checked_at, unix_time_to_iso8601(check_at)) + end + end + + def update_download_metric(success_cnt) + @metric.namespace([:download_stats]).tap do |n| + n.gauge(:last_checked_at, Time.now.iso8601) + + if success_cnt == DB_TYPES.size + n.increment(:successes, 1) + n.gauge(:status, :succeeded) + else + n.increment(:failures, 1) + n.gauge(:status, :failed) + end + end + end + + def update_download_status(status) + @metric.namespace([:download_stats]).gauge(:status, status) + end + public # scheduler callback @@ -223,6 +298,10 @@ def database_path(database_type) @states[database_type].database_path end + def metric=(metric) + @metric = metric + end + class DatabaseState attr_reader :is_eula, :plugins, :database_path, :cc_database_path, :is_expired attr_writer :is_eula, :database_path, :is_expired diff --git a/x-pack/qa/integration/monitoring/geoip_metric_spec.rb b/x-pack/qa/integration/monitoring/geoip_metric_spec.rb new file mode 100644 index 00000000000..75432263f8f --- /dev/null +++ b/x-pack/qa/integration/monitoring/geoip_metric_spec.rb @@ -0,0 +1,48 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require_relative "../spec_helper" +require_relative "../../../../qa/integration/services/monitoring_api" + +describe "GeoIP database service" do + let(:input) { "input { generator { lines => ['{\\\"host\\\": \\\"0.42.56.104\\\"}'] } } " } + let(:output) { "output { null {} }" } + let(:filter) { " " } + let(:config) { input + filter + output } + + context "monitoring API with geoip plugin" do + let(:filter) { "filter { json { source => \\\"message\\\" } geoip { source => \\\"host\\\" } } " } + + it "should have geoip" do + start_logstash + api = MonitoringAPI.new + stats = api.node_stats + + expect(stats["geoip_download_manager"]).not_to be_nil + end + end + + context "monitoring API without geoip plugin" do + it "should not have geoip" do + start_logstash + api = MonitoringAPI.new + stats = api.node_stats + + expect(stats["geoip_download_manager"]).to be_nil + end + end + + def start_logstash + @logstash_service = logstash("bin/logstash -e \"#{config}\" -w 1", { + :belzebuth => { + :wait_condition => /Pipelines running/, # Check for all pipeline started + :timeout => 5 * 60 # Fail safe, this mean something went wrong if we hit this before the wait_condition + } + }) + end + + after(:each) do + @logstash_service.stop unless @logstash_service.nil? + end +end diff --git a/x-pack/spec/filters/geoip/database_manager_spec.rb b/x-pack/spec/filters/geoip/database_manager_spec.rb index 852aca3a458..bdb13bf78f4 100644 --- a/x-pack/spec/filters/geoip/database_manager_spec.rb +++ b/x-pack/spec/filters/geoip/database_manager_spec.rb @@ -12,8 +12,11 @@ let(:mock_metadata) { double("database_metadata") } let(:mock_download_manager) { double("download_manager") } let(:mock_scheduler) { double("scheduler") } + let(:agent_metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) } let(:db_manager) do manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance + manager.metric = agent_metric + manager.send(:setup) manager.instance_variable_set(:@metadata, mock_metadata) manager.instance_variable_set(:@download_manager, mock_download_manager) manager.instance_variable_set(:@scheduler, mock_scheduler) @@ -43,6 +46,13 @@ expect(states[ASN].is_eula).to be_falsey expect(states[ASN].database_path).to eql(states[ASN].cc_database_path) expect(::File.exist?(states[ASN].cc_database_path)).to be_truthy + + c = db_manager.instance_variable_get(:@metric).collector + [ASN, CITY].each do |type| + expect(c.get([:database, type.to_sym], :status, :gauge).value).to eql(:init) + expect(c.get([:database, type.to_sym], :fail_check_in_days, :gauge).value).to be_nil + end + expect_initial_download_metric(c) end context "when metadata exists" do @@ -55,11 +65,20 @@ states = db_manager.instance_variable_get(:@states) expect(states[CITY].is_eula).to be_truthy expect(states[CITY].database_path).to include second_dirname + + c = db_manager.instance_variable_get(:@metric).collector + expect_second_database_metric(c) + expect_initial_download_metric(c) end end context "when metadata exists but database is deleted manually" do - let(:db_manager) { Class.new(LogStash::Filters::Geoip::DatabaseManager).instance } + let(:db_manager) do + manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance + manager.metric = agent_metric + manager.send(:setup) + manager + end before do rewrite_temp_metadata(metadata_path, [city2_metadata]) @@ -69,8 +88,25 @@ states = db_manager.instance_variable_get(:@states) expect(states[CITY].is_eula).to be_truthy expect(states[CITY].database_path).to be_nil + + c = db_manager.instance_variable_get(:@metric).collector + expect_second_database_metric(c) + expect_initial_download_metric(c) end end + + def expect_second_database_metric(c) + expect(c.get([:database, CITY.to_sym], :status, :gauge).value).to eql(:up_to_date) + expect(c.get([:database, CITY.to_sym], :last_updated_at, :gauge).value).to match /2020-02-20/ + expect(c.get([:database, CITY.to_sym], :fail_check_in_days, :gauge).value).to eql(0) + end + + def expect_initial_download_metric(c) + expect(c.get([:download_stats], :successes, :counter).value).to eql(0) + expect(c.get([:download_stats], :failures, :counter).value).to eql(0) + expect(c.get([:download_stats], :last_checked_at, :gauge).value).to match /#{now_in_ymd}/ + expect(c.get([:download_stats], :status, :gauge).value).to be_nil + end end context "execute download job" do @@ -81,6 +117,8 @@ context "plugin is set" do let(:db_manager) do manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance + manager.metric = agent_metric + manager.send(:setup) manager.instance_variable_set(:@metadata, mock_metadata) manager.instance_variable_set(:@download_manager, mock_download_manager) manager.instance_variable_set(:@scheduler, mock_scheduler) @@ -103,6 +141,9 @@ db_manager.send(:execute_download_job) expect(db_manager.database_path(CITY)).to match /#{second_dirname}\/#{default_city_db_name}/ expect(db_manager.database_path(ASN)).to match /#{second_dirname}\/#{default_asn_db_name}/ + + c = db_manager.instance_variable_get(:@metric).collector + expect_download_metric_success(c) end end @@ -116,6 +157,9 @@ db_manager.send(:execute_download_job) expect(db_manager.database_path(CITY)).to match /#{CC}\/#{default_city_db_name}/ expect(db_manager.database_path(ASN)).to match /#{second_dirname}\/#{default_asn_db_name}/ + + c = db_manager.instance_variable_get(:@metric).collector + expect_download_metric_fail(c) end it "should update single state and single metadata timestamp when one database got update" do @@ -128,6 +172,9 @@ db_manager.send(:execute_download_job) expect(db_manager.database_path(CITY)).to match /#{CC}\/#{default_city_db_name}/ expect(db_manager.database_path(ASN)).to match /#{second_dirname}\/#{default_asn_db_name}/ + + c = db_manager.instance_variable_get(:@metric).collector + expect_download_metric_success(c) end it "should update metadata timestamp for the unchange (no update)" do @@ -140,6 +187,9 @@ db_manager.send(:execute_download_job) expect(db_manager.database_path(CITY)).to match /#{CC}\/#{default_city_db_name}/ expect(db_manager.database_path(ASN)).to match /#{CC}\/#{default_asn_db_name}/ + + c = db_manager.instance_variable_get(:@metric).collector + expect_download_metric_success(c) end it "should not update metadata when fetch database throw exception" do @@ -149,6 +199,21 @@ expect(mock_metadata).to receive(:save_metadata).never db_manager.send(:execute_download_job) + + c = db_manager.instance_variable_get(:@metric).collector + expect_download_metric_fail(c) + end + + def expect_download_metric_success(c) + expect(c.get([:download_stats], :last_checked_at, :gauge).value).to match /#{now_in_ymd}/ + expect(c.get([:download_stats], :successes, :counter).value).to eql(1) + expect(c.get([:download_stats], :status, :gauge).value).to eql(:succeeded) + end + + def expect_download_metric_fail(c) + expect(c.get([:download_stats], :last_checked_at, :gauge).value).to match /#{now_in_ymd}/ + expect(c.get([:download_stats], :failures, :counter).value).to eql(1) + expect(c.get([:download_stats], :status, :gauge).value).to eql(:failed) end end @@ -156,6 +221,8 @@ context "eula database" do let(:db_manager) do manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance + manager.metric = agent_metric + manager.send(:setup) manager.instance_variable_set(:@metadata, mock_metadata) manager.instance_variable_set(:@download_manager, mock_download_manager) manager.instance_variable_set(:@scheduler, mock_scheduler) @@ -167,20 +234,28 @@ end it "should give warning after 25 days" do - expect(mock_metadata).to receive(:check_at).and_return((Time.now - (60 * 60 * 24 * 26)).to_i).at_least(:twice) + mock_data = [['City', (Time.now - (60 * 60 * 24 * 26)).to_i, 'ANY', second_dirname, true]] + expect(mock_metadata).to receive(:get_metadata).and_return(mock_data).at_least(:twice) expect(mock_geoip_plugin).to receive(:update_filter).never allow(LogStash::Filters::Geoip::DatabaseManager).to receive(:logger).at_least(:once).and_return(logger) expect(logger).to receive(:warn).at_least(:twice) db_manager.send(:check_age) + + c = db_manager.instance_variable_get(:@metric).collector + expect_database_metric(c, :to_be_expired, second_dirname_in_ymd, 26) end it "should log error and update plugin filter when 30 days has passed" do - expect(mock_metadata).to receive(:check_at).and_return((Time.now - (60 * 60 * 24 * 33)).to_i).at_least(:twice) + mock_data = [['City', (Time.now - (60 * 60 * 24 * 33)).to_i, 'ANY', second_dirname, true]] + expect(mock_metadata).to receive(:get_metadata).and_return(mock_data).at_least(:twice) allow(mock_geoip_plugin).to receive_message_chain('execution_context.pipeline_id').and_return('pipeline_1', 'pipeline_2') expect(mock_geoip_plugin).to receive(:update_filter).with(:expire).at_least(:twice) db_manager.send(:check_age) + + c = db_manager.instance_variable_get(:@metric).collector + expect_database_metric(c, :expired, second_dirname_in_ymd, 33) end end @@ -190,6 +265,9 @@ expect(logger).to receive(:warn).never db_manager.send(:check_age) + + c = db_manager.instance_variable_get(:@metric).collector + expect_healthy_database_metric(c) end it "should not log error when 30 days has passed" do @@ -197,8 +275,23 @@ expect(mock_geoip_plugin).to receive(:update_filter).never db_manager.send(:check_age) + + c = db_manager.instance_variable_get(:@metric).collector + expect_healthy_database_metric(c) end end + + def expect_database_metric(c, status, download_at, days) + expect(c.get([:database, CITY.to_sym], :status, :gauge).value).to eql(status) + expect(c.get([:database, CITY.to_sym], :last_updated_at, :gauge).value).to match /#{download_at}/ + expect(c.get([:database, CITY.to_sym], :fail_check_in_days, :gauge).value).to eql(days) + end + + def expect_healthy_database_metric(c) + expect(c.get([:database, CITY.to_sym], :status, :gauge).value).to eql(:init) + expect(c.get([:database, CITY.to_sym], :last_updated_at, :gauge).value).to be_nil + expect(c.get([:database, CITY.to_sym], :fail_check_in_days, :gauge).value).to be_nil + end end context "clean up database" do @@ -245,6 +338,8 @@ context "when eula database is expired" do let(:db_manager) do manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance + manager.metric = agent_metric + manager.send(:setup) manager.instance_variable_set(:@download_manager, mock_download_manager) manager.instance_variable_set(:@scheduler, mock_scheduler) manager @@ -267,6 +362,8 @@ context "unsubscribe" do let(:db_manager) do manager = Class.new(LogStash::Filters::Geoip::DatabaseManager).instance + manager.metric = agent_metric + manager.send(:setup) manager.instance_variable_set(:@metadata, mock_metadata) manager.instance_variable_set(:@download_manager, mock_download_manager) manager.instance_variable_set(:@scheduler, mock_scheduler) diff --git a/x-pack/spec/filters/geoip/test_helper.rb b/x-pack/spec/filters/geoip/test_helper.rb index ecbb28d1317..3d89bb966da 100644 --- a/x-pack/spec/filters/geoip/test_helper.rb +++ b/x-pack/spec/filters/geoip/test_helper.rb @@ -55,7 +55,7 @@ def second_asn_db_path end def second_dirname - "20200220" + "1582156922" end def create_default_city_gz @@ -120,6 +120,14 @@ def copy_cc(dir_path) FileUtils.mkdir_p(dir_path) FileUtils.cp_r(cc_database_paths, dir_path) end + + def now_in_ymd + Time.now.strftime('%Y-%m-%d') + end + + def second_dirname_in_ymd + Time.at(second_dirname.to_i).strftime('%Y-%m-%d') + end end RSpec.configure do |c|