Skip to content

Commit cc3c5ec

Browse files
authored
Refactoring of LIR serializer and exposing pipeline metrics (#10561)
* [WIP] Add ephemeral id and hash to pipeline stats Refs #10119 * Fix incorrect hash id * Add graph metrics * Include node update * LIR serializer refactor * Remove commented code * Remove more commented code * Update spec * Remove license and add encoding * Style change to make code more vertical. * Implement review suggestions * Remove commented code * Fix spec path * Add cluster UUID * Add graph?=true parameter * Add options to pipelines/ * Use pluginmetadata * Replicate change from java_pipeline to ruby pipeline
1 parent 0a92f14 commit cc3c5ec

File tree

7 files changed

+56
-24
lines changed

7 files changed

+56
-24
lines changed

logstash-core/lib/logstash/api/commands/node.rb

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,31 @@ def all(selected_fields=[])
1717
payload
1818
end
1919

20-
def pipelines
20+
def pipelines(options={})
2121
pipeline_ids = service.get_shallow(:stats, :pipelines).keys
2222
pipeline_ids.each_with_object({}) do |pipeline_id, result|
23-
result[pipeline_id] = pipeline(pipeline_id)
23+
result[pipeline_id] = pipeline(pipeline_id, options)
2424
end
2525
end
2626

27-
def pipeline(pipeline_id)
28-
extract_metrics(
27+
def pipeline(pipeline_id, options={})
28+
metrics = extract_metrics(
2929
[:stats, :pipelines, pipeline_id.to_sym, :config],
30-
:workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval, :dead_letter_queue_enabled, :dead_letter_queue_path
30+
:ephemeral_id,
31+
:hash,
32+
:workers,
33+
:batch_size,
34+
:batch_delay,
35+
:config_reload_automatic,
36+
:config_reload_interval,
37+
:dead_letter_queue_enabled,
38+
:dead_letter_queue_path,
39+
:cluster_uuids
3140
).reject{|_, v|v.nil?}
41+
if options.fetch(:graph, false)
42+
metrics.merge!(extract_metrics([:stats, :pipelines, pipeline_id.to_sym, :config], :graph))
43+
end
44+
metrics
3245
rescue
3346
{}
3447
end

logstash-core/lib/logstash/api/modules/node.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,19 @@ def node
2828

2929
get "/pipelines/:id" do
3030
pipeline_id = params["id"]
31-
payload = node.pipeline(pipeline_id)
31+
opts = {:graph => as_boolean(params.fetch("graph", false))}
32+
payload = node.pipeline(pipeline_id, opts)
3233
halt(404) if payload.empty?
3334
respond_with(:pipelines => { pipeline_id => payload } )
3435
end
3536

37+
get "/pipelines" do
38+
opts = {:graph => as_boolean(params.fetch("graph", false))}
39+
payload = node.pipelines(opts)
40+
halt(404) if payload.empty?
41+
respond_with(:pipelines => payload )
42+
end
43+
3644
get "/?:filter?" do
3745
selected_fields = extract_fields(params["filter"].to_s.strip)
3846
values = node.all(selected_fields)

x-pack/lib/monitoring/inputs/metrics/state_event/lir_serializer.rb renamed to logstash-core/lib/logstash/config/lir_serializer.rb

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2-
# or more contributor license agreements. Licensed under the Elastic License;
3-
# you may not use this file except in compliance with the Elastic License.
4-
#
5-
require 'monitoring/inputs/metrics'
1+
# encoding: utf-8
62
require 'logstash-core'
73
require 'logstash/compiler'
84

9-
module LogStash; module Inputs; class Metrics; class StateEvent;
5+
module LogStash;
6+
module Config;
107
class LIRSerializer
118
attr_reader :lir_pipeline
129

@@ -23,7 +20,6 @@ def serialize
2320
"hash" => lir_pipeline.unique_hash,
2421
"type" => "lir",
2522
"version" => "0.0.0",
26-
"plugins" => plugins,
2723
"graph" => {
2824
"vertices" => vertices,
2925
"edges" => edges
@@ -125,11 +121,6 @@ def format_swm(source_with_metadata)
125121
}
126122
end
127123

128-
def plugins
129-
::Gem::Specification.
130-
find_all.
131-
select {|spec| spec.metadata && spec.metadata["logstash_plugin"] == "true"}.
132-
map {|spec| { :name => spec.name, :version => spec.version.to_s } }
133-
end
134124
end
135-
end; end; end; end
125+
end
126+
end

logstash-core/lib/logstash/java_pipeline.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require "logstash/outputs/base"
77
require "logstash/instrument/collector"
88
require "logstash/compiler"
9+
require "logstash/config/lir_serializer"
910

1011
module LogStash; class JavaPipeline < JavaBasePipeline
1112
include LogStash::Util::Loggable
@@ -216,7 +217,10 @@ def start_workers
216217
config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval"))
217218
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
218219
config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
219-
220+
config_metric.gauge(:ephemeral_id, ephemeral_id)
221+
config_metric.gauge(:hash, lir.unique_hash)
222+
config_metric.gauge(:graph, ::LogStash::Config::LIRSerializer.serialize(lir))
223+
config_metric.gauge(:cluster_uuids, resolve_cluster_uuids)
220224

221225
@logger.info("Starting pipeline", default_logging_keys(
222226
"pipeline.workers" => pipeline_workers,
@@ -255,6 +259,14 @@ def start_workers
255259
end
256260
end
257261

262+
def resolve_cluster_uuids
263+
outputs.each_with_object(Set.new) do |output, cluster_uuids|
264+
if LogStash::PluginMetadata.exists?(output.id)
265+
cluster_uuids << LogStash::PluginMetadata.for_plugin(output.id).get(:cluster_uuid)
266+
end
267+
end.to_a.compact
268+
end
269+
258270
def wait_inputs
259271
@input_threads.each do |thread|
260272
if thread.class == Java::JavaObject

logstash-core/lib/logstash/pipeline.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,14 @@ def output_batch(batch, output_events_map)
394394
filter_queue_client.add_output_metrics(batch.filtered_size)
395395
end
396396

397+
def resolve_cluster_uuids
398+
outputs.each_with_object(Set.new) do |output, cluster_uuids|
399+
if LogStash::PluginMetadata.exists?(output.id)
400+
cluster_uuids << LogStash::PluginMetadata.for_plugin(output.id).get(:cluster_uuid)
401+
end
402+
end.to_a.compact
403+
end
404+
397405
def wait_inputs
398406
@input_threads.each(&:join)
399407
end

x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#
55
module LogStash; module Inputs; class Metrics;
66
class StateEventFactory
7-
require "monitoring/inputs/metrics/state_event/lir_serializer"
7+
require "logstash/config/lir_serializer"
88
def initialize(pipeline)
99
raise ArgumentError, "No pipeline passed in!" unless pipeline.is_a?(LogStash::Pipeline) || pipeline.is_a?(LogStash::JavaPipeline)
1010
@event = LogStash::Event.new
@@ -29,7 +29,7 @@ def pipeline_data(pipeline)
2929
"ephemeral_id" => pipeline.ephemeral_id,
3030
"workers" => pipeline.settings.get("pipeline.workers"),
3131
"batch_size" => pipeline.settings.get("pipeline.batch.size"),
32-
"representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir)
32+
"representation" => ::LogStash::Config::LIRSerializer.serialize(pipeline.lir)
3333
}
3434
end
3535

x-pack/spec/monitoring/inputs/metrics/state_event/lir_serializer_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
require "spec_helper"
66
require "logstash/environment"
77

8-
describe ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer do
8+
describe ::LogStash::Config::LIRSerializer do
99
let(:config) do
1010
<<-EOC
1111
input { fake_input {} }

0 commit comments

Comments
 (0)