diff --git a/logstash-core/lib/logstash/api/commands/node.rb b/logstash-core/lib/logstash/api/commands/node.rb index 17721bba9c8..e5b278f6f27 100644 --- a/logstash-core/lib/logstash/api/commands/node.rb +++ b/logstash-core/lib/logstash/api/commands/node.rb @@ -17,18 +17,31 @@ def all(selected_fields=[]) payload end - def pipelines + def pipelines(options={}) pipeline_ids = service.get_shallow(:stats, :pipelines).keys pipeline_ids.each_with_object({}) do |pipeline_id, result| - result[pipeline_id] = pipeline(pipeline_id) + result[pipeline_id] = pipeline(pipeline_id, options) end end - def pipeline(pipeline_id) - extract_metrics( + def pipeline(pipeline_id, options={}) + metrics = extract_metrics( [:stats, :pipelines, pipeline_id.to_sym, :config], - :workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval, :dead_letter_queue_enabled, :dead_letter_queue_path + :ephemeral_id, + :hash, + :workers, + :batch_size, + :batch_delay, + :config_reload_automatic, + :config_reload_interval, + :dead_letter_queue_enabled, + :dead_letter_queue_path, + :cluster_uuids ).reject{|_, v|v.nil?} + if options.fetch(:graph, false) + metrics.merge!(extract_metrics([:stats, :pipelines, pipeline_id.to_sym, :config], :graph)) + end + metrics rescue {} end diff --git a/logstash-core/lib/logstash/api/modules/node.rb b/logstash-core/lib/logstash/api/modules/node.rb index 985a8a6a48f..66a08780957 100644 --- a/logstash-core/lib/logstash/api/modules/node.rb +++ b/logstash-core/lib/logstash/api/modules/node.rb @@ -28,11 +28,19 @@ def node get "/pipelines/:id" do pipeline_id = params["id"] - payload = node.pipeline(pipeline_id) + opts = {:graph => as_boolean(params.fetch("graph", false))} + payload = node.pipeline(pipeline_id, opts) halt(404) if payload.empty? respond_with(:pipelines => { pipeline_id => payload } ) end + get "/pipelines" do + opts = {:graph => as_boolean(params.fetch("graph", false))} + payload = node.pipelines(opts) + halt(404) if payload.empty? + respond_with(:pipelines => payload ) + end + get "/?:filter?" do selected_fields = extract_fields(params["filter"].to_s.strip) values = node.all(selected_fields) diff --git a/x-pack/lib/monitoring/inputs/metrics/state_event/lir_serializer.rb b/logstash-core/lib/logstash/config/lir_serializer.rb similarity index 82% rename from x-pack/lib/monitoring/inputs/metrics/state_event/lir_serializer.rb rename to logstash-core/lib/logstash/config/lir_serializer.rb index 54ec8bf1b64..82b1ce2ff24 100644 --- a/x-pack/lib/monitoring/inputs/metrics/state_event/lir_serializer.rb +++ b/logstash-core/lib/logstash/config/lir_serializer.rb @@ -1,12 +1,9 @@ -# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -# or more contributor license agreements. Licensed under the Elastic License; -# you may not use this file except in compliance with the Elastic License. -# -require 'monitoring/inputs/metrics' +# encoding: utf-8 require 'logstash-core' require 'logstash/compiler' -module LogStash; module Inputs; class Metrics; class StateEvent; +module LogStash; + module Config; class LIRSerializer attr_reader :lir_pipeline @@ -23,7 +20,6 @@ def serialize "hash" => lir_pipeline.unique_hash, "type" => "lir", "version" => "0.0.0", - "plugins" => plugins, "graph" => { "vertices" => vertices, "edges" => edges @@ -125,11 +121,6 @@ def format_swm(source_with_metadata) } end - def plugins - ::Gem::Specification. - find_all. - select {|spec| spec.metadata && spec.metadata["logstash_plugin"] == "true"}. - map {|spec| { :name => spec.name, :version => spec.version.to_s } } - end end -end; end; end; end + end +end diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 8305f6c08ba..1360dd5e04d 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -6,6 +6,7 @@ require "logstash/outputs/base" require "logstash/instrument/collector" require "logstash/compiler" +require "logstash/config/lir_serializer" module LogStash; class JavaPipeline < JavaBasePipeline include LogStash::Util::Loggable @@ -216,7 +217,10 @@ def start_workers config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval")) config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?) config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled? - + config_metric.gauge(:ephemeral_id, ephemeral_id) + config_metric.gauge(:hash, lir.unique_hash) + config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir)) + config_metric.gauge(:cluster_uuids, resolve_cluster_uuids) @logger.info("Starting pipeline", default_logging_keys( "pipeline.workers" => pipeline_workers, @@ -255,6 +259,14 @@ def start_workers end end + def resolve_cluster_uuids + outputs.each_with_object(Set.new) do |output, cluster_uuids| + if LogStash::PluginMetadata.exists?(output.id) + cluster_uuids << LogStash::PluginMetadata.for_plugin(output.id).get(:cluster_uuid) + end + end.to_a.compact + end + def wait_inputs @input_threads.each do |thread| if thread.class == Java::JavaObject diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 2ce46b1fc24..82add71206e 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -394,6 +394,14 @@ def output_batch(batch, output_events_map) filter_queue_client.add_output_metrics(batch.filtered_size) end + def resolve_cluster_uuids + outputs.each_with_object(Set.new) do |output, cluster_uuids| + if LogStash::PluginMetadata.exists?(output.id) + cluster_uuids << LogStash::PluginMetadata.for_plugin(output.id).get(:cluster_uuid) + end + end.to_a.compact + end + def wait_inputs @input_threads.each(&:join) end diff --git a/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb b/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb index c0ef49d0a8a..5d31d1c6c94 100644 --- a/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb +++ b/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb @@ -4,7 +4,7 @@ # module LogStash; module Inputs; class Metrics; class StateEventFactory - require "monitoring/inputs/metrics/state_event/lir_serializer" + require "logstash/config/lir_serializer" def initialize(pipeline) raise ArgumentError, "No pipeline passed in!" unless pipeline.is_a?(LogStash::Pipeline) || pipeline.is_a?(LogStash::JavaPipeline) @event = LogStash::Event.new @@ -29,7 +29,7 @@ def pipeline_data(pipeline) "ephemeral_id" => pipeline.ephemeral_id, "workers" => pipeline.settings.get("pipeline.workers"), "batch_size" => pipeline.settings.get("pipeline.batch.size"), - "representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir) + "representation" => ::LogStash::Config::LIRSerializer.serialize(pipeline.lir) } end diff --git a/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb b/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb index 9d509670716..b4c2c64dd53 100644 --- a/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb +++ b/x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb @@ -5,7 +5,7 @@ require "spec_helper" require "logstash/environment" -describe ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer do +describe ::LogStash::Config::LIRSerializer do let(:config) do <<-EOC input { fake_input {} }