Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions logstash-core/lib/logstash/api/commands/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,31 @@ def all(selected_fields=[])
payload
end

def pipelines
def pipelines(options={})
pipeline_ids = service.get_shallow(:stats, :pipelines).keys
pipeline_ids.each_with_object({}) do |pipeline_id, result|
result[pipeline_id] = pipeline(pipeline_id)
result[pipeline_id] = pipeline(pipeline_id, options)
end
end

def pipeline(pipeline_id)
extract_metrics(
def pipeline(pipeline_id, options={})
metrics = extract_metrics(
[:stats, :pipelines, pipeline_id.to_sym, :config],
:workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval, :dead_letter_queue_enabled, :dead_letter_queue_path
:ephemeral_id,
:hash,
:workers,
:batch_size,
:batch_delay,
:config_reload_automatic,
:config_reload_interval,
:dead_letter_queue_enabled,
:dead_letter_queue_path,
:cluster_uuids
).reject{|_, v|v.nil?}
if options.fetch(:graph, false)
metrics.merge!(extract_metrics([:stats, :pipelines, pipeline_id.to_sym, :config], :graph))
end
metrics
rescue
{}
end
Expand Down
10 changes: 9 additions & 1 deletion logstash-core/lib/logstash/api/modules/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ def node

get "/pipelines/:id" do
pipeline_id = params["id"]
payload = node.pipeline(pipeline_id)
opts = {:graph => as_boolean(params.fetch("graph", false))}
payload = node.pipeline(pipeline_id, opts)
halt(404) if payload.empty?
respond_with(:pipelines => { pipeline_id => payload } )
end

get "/pipelines" do
opts = {:graph => as_boolean(params.fetch("graph", false))}
payload = node.pipelines(opts)
halt(404) if payload.empty?
respond_with(:pipelines => payload )
end

get "/?:filter?" do
selected_fields = extract_fields(params["filter"].to_s.strip)
values = node.all(selected_fields)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
require 'monitoring/inputs/metrics'
# encoding: utf-8
require 'logstash-core'
require 'logstash/compiler'

module LogStash; module Inputs; class Metrics; class StateEvent;
module LogStash;
module Config;
class LIRSerializer
attr_reader :lir_pipeline

Expand All @@ -23,7 +20,6 @@ def serialize
"hash" => lir_pipeline.unique_hash,
"type" => "lir",
"version" => "0.0.0",
"plugins" => plugins,
"graph" => {
"vertices" => vertices,
"edges" => edges
Expand Down Expand Up @@ -125,11 +121,6 @@ def format_swm(source_with_metadata)
}
end

def plugins
::Gem::Specification.
find_all.
select {|spec| spec.metadata && spec.metadata["logstash_plugin"] == "true"}.
map {|spec| { :name => spec.name, :version => spec.version.to_s } }
end
end
end; end; end; end
end
end
14 changes: 13 additions & 1 deletion logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "logstash/outputs/base"
require "logstash/instrument/collector"
require "logstash/compiler"
require "logstash/config/lir_serializer"

module LogStash; class JavaPipeline < JavaBasePipeline
include LogStash::Util::Loggable
Expand Down Expand Up @@ -216,7 +217,10 @@ def start_workers
config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval"))
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?

config_metric.gauge(:ephemeral_id, ephemeral_id)
config_metric.gauge(:hash, lir.unique_hash)
config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))
config_metric.gauge(:cluster_uuids, resolve_cluster_uuids)

@logger.info("Starting pipeline", default_logging_keys(
"pipeline.workers" => pipeline_workers,
Expand Down Expand Up @@ -255,6 +259,14 @@ def start_workers
end
end

def resolve_cluster_uuids
outputs.each_with_object(Set.new) do |output, cluster_uuids|
if LogStash::PluginMetadata.exists?(output.id)
cluster_uuids << LogStash::PluginMetadata.for_plugin(output.id).get(:cluster_uuid)
end
end.to_a.compact
end

def wait_inputs
@input_threads.each do |thread|
if thread.class == Java::JavaObject
Expand Down
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,14 @@ def output_batch(batch, output_events_map)
filter_queue_client.add_output_metrics(batch.filtered_size)
end

def resolve_cluster_uuids
outputs.each_with_object(Set.new) do |output, cluster_uuids|
if LogStash::PluginMetadata.exists?(output.id)
cluster_uuids << LogStash::PluginMetadata.for_plugin(output.id).get(:cluster_uuid)
end
end.to_a.compact
end

def wait_inputs
@input_threads.each(&:join)
end
Expand Down
4 changes: 2 additions & 2 deletions x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
module LogStash; module Inputs; class Metrics;
class StateEventFactory
require "monitoring/inputs/metrics/state_event/lir_serializer"
require "logstash/config/lir_serializer"
def initialize(pipeline)
raise ArgumentError, "No pipeline passed in!" unless pipeline.is_a?(LogStash::Pipeline) || pipeline.is_a?(LogStash::JavaPipeline)
@event = LogStash::Event.new
Expand All @@ -29,7 +29,7 @@ def pipeline_data(pipeline)
"ephemeral_id" => pipeline.ephemeral_id,
"workers" => pipeline.settings.get("pipeline.workers"),
"batch_size" => pipeline.settings.get("pipeline.batch.size"),
"representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir)
"representation" => ::LogStash::Config::LIRSerializer.serialize(pipeline.lir)
}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
require "spec_helper"
require "logstash/environment"

describe ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer do
describe ::LogStash::Config::LIRSerializer do
let(:config) do
<<-EOC
input { fake_input {} }
Expand Down