Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,20 @@ def plugin_stats(stats, plugin_type)
end

def refine_batch_metrics(stats)
# current is a tuple of [event_count, byte_size] store the reference locally to avoid repeatedly
# reading and retrieve unrelated values
current_data_point = stats[:batch][:current]
{
:event_count => {
# current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value
:current => current_data_point.value[0],
:average => {
# average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
:lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0
}
},
:byte_size => {
:current => current_data_point.value[1],
:average => {
:lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0
}
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/spec/logstash/api/modules/node_stats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@
},
"batch" => {
"event_count" => {
"current" => Numeric,
"average" => {
"lifetime" => Numeric
}
},
"byte_size" => {
"current" => Numeric,
"average" => {
"lifetime" => Numeric
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UpScaledMetric;
import org.logstash.instrument.metrics.gauge.TextGauge;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;

import java.security.SecureRandom;
import java.util.Arrays;

import static org.logstash.instrument.metrics.MetricKeys.*;

Expand All @@ -22,6 +24,7 @@ class QueueReadClientBatchMetrics {
private LongCounter pipelineMetricBatchByteSize;
private LongCounter pipelineMetricBatchTotalEvents;
private final SecureRandom random = new SecureRandom();
private LazyDelegatingGauge currentBatchDimensions;

public QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode batchMetricMode) {
this.batchMetricMode = batchMetricMode;
Expand All @@ -35,16 +38,18 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
pipelineMetricBatchCount = LongCounter.fromRubyBase(batchNamespace, BATCH_COUNT);
pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS);
pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES);
currentBatchDimensions = LazyDelegatingGauge.fromRubyBase(batchNamespace, BATCH_CURRENT_KEY);
}
}

public void updateBatchMetrics(QueueBatch batch) {
if (batch.events().isEmpty()) {
// avoid to increment batch count for empty batches
if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
return;
}

if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
if (batch.events().isEmpty()) {
// avoid to increment batch count for empty batches
currentBatchDimensions.set(Arrays.asList(0L, 0L));
return;
}

Expand All @@ -69,6 +74,7 @@ private void updateBatchSizeMetric(QueueBatch batch) {
pipelineMetricBatchCount.increment();
pipelineMetricBatchTotalEvents.increment(batch.filteredSize());
pipelineMetricBatchByteSize.increment(totalSize);
currentBatchDimensions.set(Arrays.asList(batch.filteredSize(), totalSize));
} catch (IllegalArgumentException e) {
LOG.error("Failed to calculate batch byte size for metrics", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class MetricExt extends AbstractSimpleMetricExt {

private static final RubySymbol DECREMENT = RubyUtil.RUBY.newSymbol("decrement");

private static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");
public static final RubySymbol GAUGE = RubyUtil.RUBY.newSymbol("gauge");
private static final RubySymbol TIMER = RubyUtil.RUBY.newSymbol("timer");
private static final RubySymbol SET = RubyUtil.RUBY.newSymbol("set");
private static final RubySymbol GET = RubyUtil.RUBY.newSymbol("get");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ private MetricKeys() {

public static final RubySymbol BATCH_BYTE_SIZE_KEY = RubyUtil.RUBY.newSymbol("byte_size");

public static final RubySymbol BATCH_CURRENT_KEY = RubyUtil.RUBY.newSymbol("current");

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ protected IRubyObject getCounter(final ThreadContext context, final IRubyObject
@Override
protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key,
final IRubyObject value) {
return metric.gauge(context, namespaceName, key, value);
metric.gauge(context, namespaceName, key, value);
return collector(context).callMethod(
context, "get", new IRubyObject[]{namespaceName, key, MetricExt.GAUGE}
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyHash;
import org.jruby.RubySymbol;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyTimestampExtLibrary.RubyTimestamp;
import org.logstash.instrument.metrics.AbstractMetric;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricType;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

Expand All @@ -39,11 +45,27 @@ public class LazyDelegatingGauge extends AbstractMetric<Object> implements Gauge

private static final Logger LOGGER = LogManager.getLogger(LazyDelegatingGauge.class);

public static final LazyDelegatingGauge DUMMY_GAUGE = new LazyDelegatingGauge("dummy");

protected final String key;

@SuppressWarnings("rawtypes")
private GaugeMetric lazyMetric;


public static LazyDelegatingGauge fromRubyBase(final AbstractNamespacedMetricExt metric, final RubySymbol key) {
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
// just initialize an empty gauge
final IRubyObject gauge = metric.gauge(context, key, context.runtime.newArray(context.runtime.newString("undefined"), context.runtime.newString("undefined")));
final LazyDelegatingGauge javaGauge;
if (LazyDelegatingGauge.class.isAssignableFrom(gauge.getJavaClass())) {
javaGauge = gauge.toJava(LazyDelegatingGauge.class);
} else {
javaGauge = DUMMY_GAUGE;
}
return javaGauge;
}

/**
* Constructor - null initial value
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
import org.logstash.instrument.metrics.timer.TimerMetric;

import java.util.Objects;
Expand Down Expand Up @@ -36,7 +37,9 @@ public static MockNamespacedMetric create() {

@Override
protected IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value) {
return null;
Objects.requireNonNull(key);
requireRubySymbol(key, "key");
return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), LazyDelegatingGauge::new));
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions qa/integration/specs/monitoring_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,17 @@
expect(batch_stats["event_count"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
expect(batch_stats["event_count"]["average"]["lifetime"]).to be > 0

expect(batch_stats["event_count"]["current"]).not_to be_nil
expect(batch_stats["event_count"]["current"]).to be >= 0

expect(batch_stats["byte_size"]).not_to be_nil
expect(batch_stats["byte_size"]["average"]).not_to be_nil
expect(batch_stats["byte_size"]["average"]["lifetime"]).not_to be_nil
expect(batch_stats["byte_size"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
expect(batch_stats["byte_size"]["average"]["lifetime"]).to be > 0

expect(batch_stats["byte_size"]["current"]).not_to be_nil
expect(batch_stats["byte_size"]["current"]).to be >= 0
end
end
end
Expand Down