Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,20 @@ def plugin_stats(stats, plugin_type)
end

def refine_batch_metrics(stats)
# current is a tuple of [event_count, byte_size] store the reference locally to avoid repeatedly
# reading and retrieve unrelated values
current_data_point = stats[:batch][:current]
{
:event_count => {
# current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value
:current => current_data_point.value[0],
:average => {
# average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
:lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0
}
},
:byte_size => {
:current => current_data_point.value[1],
:average => {
:lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0
}
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/spec/logstash/api/modules/node_stats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@
},
"batch" => {
"event_count" => {
"current" => Numeric,
"average" => {
"lifetime" => Numeric
}
},
"byte_size" => {
"current" => Numeric,
"average" => {
"lifetime" => Numeric
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UpScaledMetric;
import org.logstash.instrument.metrics.gauge.TextGauge;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;

import java.security.SecureRandom;
import java.util.Arrays;

import static org.logstash.instrument.metrics.MetricKeys.*;

Expand All @@ -22,6 +24,7 @@ class QueueReadClientBatchMetrics {
private LongCounter pipelineMetricBatchByteSize;
private LongCounter pipelineMetricBatchTotalEvents;
private final SecureRandom random = new SecureRandom();
private LazyDelegatingGauge currentBatchDimensions;

public QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode batchMetricMode) {
this.batchMetricMode = batchMetricMode;
Expand All @@ -35,16 +38,18 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
pipelineMetricBatchCount = LongCounter.fromRubyBase(batchNamespace, BATCH_COUNT);
pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS);
pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES);
currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY);
}
}

public void updateBatchMetrics(QueueBatch batch) {
if (batch.events().isEmpty()) {
// avoid to increment batch count for empty batches
if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
return;
}

if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
if (batch.events().isEmpty()) {
// don't update averages for empty batches, but set current back to zero
currentBatchDimensions.set(Arrays.asList(0L, 0L));
return;
}

Expand All @@ -62,13 +67,14 @@ public void updateBatchMetrics(QueueBatch batch) {
private void updateBatchSizeMetric(QueueBatch batch) {
try {
// if an error occurs in estimating the size of the batch, no counter has to be updated
long totalSize = 0L;
long totalByteSize = 0L;
for (JrubyEventExtLibrary.RubyEvent rubyEvent : batch.events()) {
totalSize += rubyEvent.getEvent().estimateMemory();
totalByteSize += rubyEvent.getEvent().estimateMemory();
}
pipelineMetricBatchCount.increment();
pipelineMetricBatchTotalEvents.increment(batch.filteredSize());
pipelineMetricBatchByteSize.increment(totalSize);
pipelineMetricBatchByteSize.increment(totalByteSize);
currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalByteSize));
} catch (IllegalArgumentException e) {
LOG.error("Failed to calculate batch byte size for metrics", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public final class MetricExt extends AbstractSimpleMetricExt {

private static final long serialVersionUID = 1L;

public static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter");
// These two metric type symbols need to be package-private because used in NamespacedMetricExt
static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter");
static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");

private static final RubyFixnum ONE = RubyUtil.RUBY.newFixnum(1);

private static final RubySymbol INCREMENT = RubyUtil.RUBY.newSymbol("increment");

private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement");

private static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");
private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer");
private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set");
private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ private MetricKeys() {

public static final RubySymbol BATCH_BYTE_SIZE_KEY = RubyUtil.RUBY.newSymbol("byte_size");

public static final RubySymbol BATCH_CURRENT_KEY = RubyUtil.RUBY.newSymbol("current");

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ protected IRubyObject getCounter(final ThreadContext context, final IRubyObject
@Override
protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key,
final IRubyObject value) {
return metric.gauge(context, namespaceName, key, value);
metric.gauge(context, namespaceName, key, value);
return collector(context).callMethod(
context, "get", new IRubyObject[]{namespaceName, key, MetricExt.GAUGE}
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyHash;
import org.jruby.RubySymbol;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyTimestampExtLibrary.RubyTimestamp;
import org.logstash.instrument.metrics.AbstractMetric;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricType;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

Expand All @@ -39,11 +45,27 @@ public class LazyDelegatingGauge extends AbstractMetric<Object> implements Gauge

private static final Logger LOGGER = LogManager.getLogger(LazyDelegatingGauge.class);

public static final LazyDelegatingGauge DUMMY_GAUGE = new LazyDelegatingGauge("dummy");

protected final String key;

@SuppressWarnings("rawtypes")
private GaugeMetric lazyMetric;


public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt metric, final RubySymbol key) {
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
// just initialize an empty gauge
final IRubyObject gauge = metric.gauge(context, key, context.runtime.newArray(context.runtime.newString("undefined"), context.runtime.newString("undefined")));
final LazyDelegatingGauge javaGauge;
if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) {
javaGauge = gauge.toJava(LazyDelegatingGauge.class);
} else {
javaGauge = DUMMY_GAUGE;
}
return javaGauge;
}

/**
* Constructor - null initial value
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
import org.logstash.instrument.metrics.timer.TimerMetric;

import java.util.Objects;
Expand Down Expand Up @@ -36,7 +37,9 @@ public static MockNamespacedMetric create() {

@Override
protected IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value) {
return null;
Objects.requireNonNull(key);
requireRubySymbol(key, "key");
return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), LazyDelegatingGauge::new));
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions qa/integration/specs/monitoring_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,17 @@
expect(batch_stats["event_count"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
expect(batch_stats["event_count"]["average"]["lifetime"]).to be > 0

expect(batch_stats["event_count"]["current"]).not_to be_nil
expect(batch_stats["event_count"]["current"]).to be >= 0

expect(batch_stats["byte_size"]).not_to be_nil
expect(batch_stats["byte_size"]["average"]).not_to be_nil
expect(batch_stats["byte_size"]["average"]["lifetime"]).not_to be_nil
expect(batch_stats["byte_size"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
expect(batch_stats["byte_size"]["average"]["lifetime"]).to be > 0

expect(batch_stats["byte_size"]["current"]).not_to be_nil
expect(batch_stats["byte_size"]["current"]).to be >= 0
end
end
end
Expand Down