diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 2ec44fe0afa..7ae782fadf1 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -173,14 +173,20 @@ def plugin_stats(stats, plugin_type) end def refine_batch_metrics(stats) + # current is a tuple of [event_count, byte_size] store the reference locally to avoid repeatedly + # reading and retrieve unrelated values + current_data_point = stats[:batch][:current] { :event_count => { + # current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value + :current => current_data_point.value[0], :average => { # average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details. :lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0 } }, :byte_size => { + :current => current_data_point.value[1], :average => { :lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0 } diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index 71468fdd63e..136c1349dbf 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -150,11 +150,13 @@ }, "batch" => { "event_count" => { + "current" => Numeric, "average" => { "lifetime" => Numeric } }, "byte_size" => { + "current" => Numeric, "average" => { "lifetime" => Numeric } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index ef12af32cd6..5bf98abff19 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -92,6 +92,7 @@ import org.logstash.instrument.metrics.MetricType; import org.logstash.instrument.metrics.NullMetricExt; import org.logstash.instrument.metrics.UpScaledMetric; +import org.logstash.instrument.metrics.gauge.TextGauge; import org.logstash.instrument.metrics.timer.TimerMetric; import org.logstash.instrument.metrics.UptimeMetric; import org.logstash.instrument.metrics.counter.LongCounter; diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java index 3ed58daaef2..a91cdf5dedf 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java @@ -7,8 +7,10 @@ import org.logstash.ext.JrubyEventExtLibrary; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.counter.LongCounter; +import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; import java.security.SecureRandom; +import java.util.Arrays; import static org.logstash.instrument.metrics.MetricKeys.*; @@ -22,6 +24,7 @@ class QueueReadClientBatchMetrics { private LongCounter pipelineMetricBatchByteSize; private LongCounter pipelineMetricBatchTotalEvents; private final SecureRandom random = new SecureRandom(); + private LazyDelegatingGauge currentBatchDimensions; public QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode batchMetricMode) { this.batchMetricMode = batchMetricMode; @@ -35,16 +38,18 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) { pipelineMetricBatchCount = LongCounter.fromRubyBase(batchNamespace, BATCH_COUNT); pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS); pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES); + currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY); } } public void updateBatchMetrics(QueueBatch batch) { - if (batch.events().isEmpty()) { - // avoid to increment batch count for empty batches + if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) { return; } - if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) { + if (batch.events().isEmpty()) { + // don't update averages for empty batches, but set current back to zero + currentBatchDimensions.set(Arrays.asList(0L, 0L)); return; } @@ -62,13 +67,14 @@ public void updateBatchMetrics(QueueBatch batch) { private void updateBatchSizeMetric(QueueBatch batch) { try { // if an error occurs in estimating the size of the batch, no counter has to be updated - long totalSize = 0L; + long totalByteSize = 0L; for (JrubyEventExtLibrary.RubyEvent rubyEvent : batch.events()) { - totalSize += rubyEvent.getEvent().estimateMemory(); + totalByteSize += rubyEvent.getEvent().estimateMemory(); } pipelineMetricBatchCount.increment(); pipelineMetricBatchTotalEvents.increment(batch.filteredSize()); - pipelineMetricBatchByteSize.increment(totalSize); + pipelineMetricBatchByteSize.increment(totalByteSize); + currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize)); } catch (IllegalArgumentException e) { LOG.error("Failed to calculate batch byte size for metrics", e); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java index 1303e1a753a..f842500e19d 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricExt.java @@ -44,15 +44,15 @@ public final class MetricExt extends AbstractSimpleMetricExt { private static final long serialVersionUID = 1L; - public static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter"); + // These two metric type symbols need to be package-private because used in NamespacedMetricExt + static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter"); + static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge"); private static final RubyFixnum ONE = RubyUtil.RUBY.newFixnum(1); private static final RubySymbol INCREMENT = RubyUtil.RUBY.newSymbol("increment"); private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement"); - - private static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge"); private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer"); private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set"); private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get"); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java index 15f540b3dca..e3043517282 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java @@ -131,4 +131,6 @@ private MetricKeys() { public static final RubySymbol BATCH_BYTE_SIZE_KEY = RubyUtil.RUBY.newSymbol("byte_size"); + public static final RubySymbol BATCH_CURRENT_KEY = RubyUtil.RUBY.newSymbol("current"); + } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java index 8a77b4e2f03..388cfa7de7b 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/NamespacedMetricExt.java @@ -76,7 +76,10 @@ protected IRubyObject getCounter(final ThreadContext context, final IRubyObject @Override protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key, final IRubyObject value) { - return metric.gauge(context, namespaceName, key, value); + metric.gauge(context, namespaceName, key, value); + return collector(context).callMethod( + context, "get", new IRubyObject[]{namespaceName, key, MetricExt.GAUGE} + ); } @Override diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index cf9f30ad8bf..10c10fceef5 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -23,10 +23,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jruby.RubyHash; +import org.jruby.RubySymbol; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; import org.logstash.ext.JrubyTimestampExtLibrary.RubyTimestamp; import org.logstash.instrument.metrics.AbstractMetric; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.MetricType; +import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -39,11 +45,27 @@ public class LazyDelegatingGauge extends AbstractMetric implements Gauge private static final Logger LOGGER = LogManager.getLogger(LazyDelegatingGauge.class); + public static final LazyDelegatingGauge DUMMY_GAUGE = new LazyDelegatingGauge("dummy"); + protected final String key; @SuppressWarnings("rawtypes") private GaugeMetric lazyMetric; + + public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt metric, final RubySymbol key) { + final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); + // just initialize an empty gauge + final IRubyObject gauge = metric.gauge(context, key, context.runtime.newArray(context.runtime.newString("undefined"), context.runtime.newString("undefined"))); + final LazyDelegatingGauge javaGauge; + if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) { + javaGauge = gauge.toJava(LazyDelegatingGauge.class); + } else { + javaGauge = DUMMY_GAUGE; + } + return javaGauge; + } + /** * Constructor - null initial value * diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java index 91ea0e49d01..2f784fa9119 100644 --- a/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java +++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java @@ -9,6 +9,7 @@ import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; import org.logstash.instrument.metrics.counter.LongCounter; +import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; import org.logstash.instrument.metrics.timer.TimerMetric; import java.util.Objects; @@ -36,7 +37,9 @@ public static MockNamespacedMetric create() { @Override protected IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value) { - return null; + Objects.requireNonNull(key); + requireRubySymbol(key, "key"); + return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), LazyDelegatingGauge::new)); } @Override diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index a77c113ca78..4c3e43a8245 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -269,11 +269,17 @@ expect(batch_stats["event_count"]["average"]["lifetime"]).to be_a_kind_of(Numeric) expect(batch_stats["event_count"]["average"]["lifetime"]).to be > 0 + expect(batch_stats["event_count"]["current"]).not_to be_nil + expect(batch_stats["event_count"]["current"]).to be >= 0 + expect(batch_stats["byte_size"]).not_to be_nil expect(batch_stats["byte_size"]["average"]).not_to be_nil expect(batch_stats["byte_size"]["average"]["lifetime"]).not_to be_nil expect(batch_stats["byte_size"]["average"]["lifetime"]).to be_a_kind_of(Numeric) expect(batch_stats["byte_size"]["average"]["lifetime"]).to be > 0 + + expect(batch_stats["byte_size"]["current"]).not_to be_nil + expect(batch_stats["byte_size"]["current"]).to be >= 0 end end end