diff --git a/config/logstash.yml b/config/logstash.yml
index e4d008ca5cd..9658274fa64 100644
--- a/config/logstash.yml
+++ b/config/logstash.yml
@@ -49,6 +49,13 @@
#
# pipeline.batch.delay: 50
#
+# Set the pipeline's batch metrics reporting mode. It can be "disabled" to disable it.
+# "minimal" to collect only 1% of the batches metrics, "full" to collect all batches.
+# Default is "minimal".
+#
+# pipeline.batch.metrics.sampling_mode: "minimal"
+pipeline.batch.metrics.sampling_mode: minimal
+#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb
index 5bf1b3e3a09..2ec44fe0afa 100644
--- a/logstash-core/lib/logstash/api/commands/stats.rb
+++ b/logstash-core/lib/logstash/api/commands/stats.rb
@@ -172,6 +172,23 @@ def plugin_stats(stats, plugin_type)
end
end
+ def refine_batch_metrics(stats)
+ {
+ :event_count => {
+ :average => {
+ # average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
+ :lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0
+ }
+ },
+ :byte_size => {
+ :average => {
+ :lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0
+ }
+ }
+ }
+ end
+ private :refine_batch_metrics
+
def report(stats, extended_stats = nil, opts = {})
ret = {
:events => stats[:events],
@@ -190,6 +207,7 @@ def report(stats, extended_stats = nil, opts = {})
:batch_delay => stats.dig(:config, :batch_delay),
}
}
+ ret[:batch] = refine_batch_metrics(stats) if stats.include?(:batch)
ret[:dead_letter_queue] = stats[:dlq] if stats.include?(:dlq)
# if extended_stats were provided, enrich the return value
diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb
index 49978fad31a..de473017a75 100644
--- a/logstash-core/lib/logstash/environment.rb
+++ b/logstash-core/lib/logstash/environment.rb
@@ -86,6 +86,7 @@ def self.as_java_range(r)
Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable,
Setting::PasswordSetting.new("api.ssl.keystore.password", nil, false).nullable,
Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]),
+ Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]),
Setting::StringSetting.new("queue.type", "memory", true, ["persisted", "memory"]),
Setting::BooleanSetting.new("queue.drain", false),
Setting::Bytes.new("queue.page_capacity", "64mb"),
diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb
index 11ce715bea2..600b689d0f1 100644
--- a/logstash-core/lib/logstash/java_pipeline.rb
+++ b/logstash-core/lib/logstash/java_pipeline.rb
@@ -267,6 +267,7 @@ def start_workers
@preserve_event_order = preserve_event_order?(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")
+ batch_metric_sampling = settings.get("pipeline.batch.metrics.sampling_mode")
max_inflight = batch_size * pipeline_workers
@@ -287,6 +288,7 @@ def start_workers
"pipeline.batch.size" => batch_size,
"pipeline.batch.delay" => batch_delay,
"pipeline.max_inflight" => max_inflight,
+ "batch_metric_sampling" => batch_metric_sampling,
"pipeline.sources" => pipeline_source_details)
@logger.info("Starting pipeline", pipeline_log_params)
diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb
index e90d5523317..3cf25974208 100644
--- a/logstash-core/lib/logstash/settings.rb
+++ b/logstash-core/lib/logstash/settings.rb
@@ -58,6 +58,7 @@ def self.included(base)
"path.dead_letter_queue",
"path.queue",
"pipeline.batch.delay",
+ "pipeline.batch.metrics.sampling_mode",
"pipeline.batch.size",
"pipeline.id",
"pipeline.reloadable",
diff --git a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
index ccd37f6bb4e..c7c1eae328a 100644
--- a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
+++ b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
@@ -38,7 +38,7 @@
end
let(:queue) do
- described_class.new(queue_settings)
+ described_class.new(queue_settings, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED)
end
let(:writer_threads) do
diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb
index 48b73f80ce5..71468fdd63e 100644
--- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb
+++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb
@@ -21,8 +21,13 @@
require "logstash/api/modules/node_stats"
describe LogStash::Api::Modules::NodeStats do
- # enable PQ to ensure PQ-related metrics are present
- include_context "api setup", {"queue.type" => "persisted"}
+
+ include_context "api setup", {
+ # enable PQ to ensure PQ-related metrics are present
+ "queue.type" => "persisted",
+ #enable batch metrics
+ "pipeline.batch.metrics.sampling_mode" => "full"
+ }
include_examples "not found"
extend ResourceDSLMethods
@@ -142,6 +147,18 @@
"path" => String,
"free_space_in_bytes" => Numeric
}
+ },
+ "batch" => {
+ "event_count" => {
+ "average" => {
+ "lifetime" => Numeric
+ }
+ },
+ "byte_size" => {
+ "average" => {
+ "lifetime" => Numeric
+ }
+ }
}
}
},
diff --git a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb
index 1c1f9a1fb2d..3fa5bf325ad 100644
--- a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb
+++ b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb
@@ -113,7 +113,7 @@ def threaded_read_client
end
context "WrappedSynchronousQueue" do
- let(:queue) { LogStash::WrappedSynchronousQueue.new(1024) }
+ let(:queue) { LogStash::WrappedSynchronousQueue.new(1024, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED) }
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
@@ -136,7 +136,9 @@ def threaded_read_client
.build
end
- let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
+ let(:queue) do
+ LogStash::WrappedAckedQueue.new(queue_settings, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED)
+ end
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb
index 540113412c4..7297e92dc43 100644
--- a/logstash-core/spec/logstash/queue_factory_spec.rb
+++ b/logstash-core/spec/logstash/queue_factory_spec.rb
@@ -31,6 +31,7 @@
LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024),
LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false),
LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id),
+ LogStash::Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
]
diff --git a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
index 67093a78eea..19934c7892e 100644
--- a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
+++ b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
@@ -64,7 +64,7 @@
.build
end
- let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
+ let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED) }
after do
queue.close
diff --git a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb
index 9e644195806..2f4b771e00d 100644
--- a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb
+++ b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb
@@ -19,7 +19,7 @@
require "logstash/instrument/collector"
describe LogStash::WrappedSynchronousQueue do
- subject {LogStash::WrappedSynchronousQueue.new(5)}
+ subject {LogStash::WrappedSynchronousQueue.new(5, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED)}
describe "queue clients" do
context "when requesting a write client" do
diff --git a/logstash-core/src/main/java/org/logstash/ConvertedMap.java b/logstash-core/src/main/java/org/logstash/ConvertedMap.java
index c9f0da606e9..c815679d4ea 100644
--- a/logstash-core/src/main/java/org/logstash/ConvertedMap.java
+++ b/logstash-core/src/main/java/org/logstash/ConvertedMap.java
@@ -21,14 +21,26 @@
package org.logstash;
import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+
+import org.jruby.RubyBignum;
+import org.jruby.RubyBoolean;
+import org.jruby.RubyFixnum;
+import org.jruby.RubyFloat;
import org.jruby.RubyHash;
+import org.jruby.RubyNil;
import org.jruby.RubyString;
+import org.jruby.RubySymbol;
+import org.jruby.ext.bigdecimal.RubyBigDecimal;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.ext.JrubyTimestampExtLibrary;
/**
*
This class is an internal API and behaves very different from a standard {@link Map}.
@@ -41,7 +53,7 @@
* intern pool to ensure identity match of equivalent strings.
* For performance, we keep a global cache of strings that have been interned for use with {@link ConvertedMap},
* and encourage interning through {@link ConvertedMap#internStringForUseAsKey(String)} to avoid
- * the performance pentalty of the global string intern pool.
+ * the performance penalty of the global string intern pool.
*/
public final class ConvertedMap extends IdentityHashMap {
@@ -157,4 +169,112 @@ public Object unconvert() {
private static String convertKey(final RubyString key) {
return internStringForUseAsKey(key.asJavaString());
}
+
+ public long estimateMemory() {
+ return values().stream()
+ .map(this::estimateMemory)
+ .mapToLong(Long::longValue)
+ .sum();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private long estimateMemory(Object o) {
+ if (o instanceof Boolean) {
+ return Byte.BYTES;
+ }
+ if (o instanceof Byte) {
+ return Byte.BYTES;
+ }
+ if (o instanceof Short) {
+ return Short.BYTES;
+ }
+ if (o instanceof Integer) {
+ return Integer.BYTES;
+ }
+ if (o instanceof Long) {
+ return Long.BYTES;
+ }
+ if (o instanceof Float) {
+ return Float.BYTES;
+ }
+ if (o instanceof Double) {
+ return Double.BYTES;
+ }
+ if (o instanceof Character) {
+ return Character.BYTES;
+ }
+ if (o instanceof String) {
+ return ((String) o).getBytes().length;
+ }
+ if (o instanceof RubyString) {
+ return ((RubyString) o).getBytes().length;
+ }
+
+ if (o instanceof Collection) {
+ Collection c = (Collection) o;
+ long memory = 0L;
+ for (Object v : c) {
+ memory += estimateMemory(v);
+ }
+ return memory;
+ }
+
+ if (o instanceof ConvertedMap) {
+ ConvertedMap c = (ConvertedMap) o;
+ return c.estimateMemory();
+ }
+
+ if (o instanceof Map) {
+ // this case shouldn't happen because all Map are converted to ConvertedMap
+ Map m = (Map) o;
+ long memory = 0L;
+ for (Map.Entry e : m.entrySet()) {
+ memory += estimateMemory(e.getKey());
+ memory += estimateMemory(e.getValue());
+ }
+ return memory;
+ }
+ if (o instanceof JrubyTimestampExtLibrary.RubyTimestamp) {
+ // wraps an java.time.Instant which is made of long and int
+ return Long.BYTES + Integer.BYTES;
+ }
+ if (o instanceof BigInteger) {
+ return ((BigInteger) o).toByteArray().length;
+ }
+ if (o instanceof BigDecimal) {
+ // BigInteger has 4 fields, one reference 2 ints (scale and precision) and a long.
+ return 8 + 2 * Integer.BYTES + Long.BYTES;
+ }
+ if (o instanceof RubyBignum) {
+ RubyBignum rbn = (RubyBignum) o;
+ return ((RubyFixnum) rbn.size()).getLongValue();
+ }
+ if (o instanceof RubyBigDecimal) {
+ RubyBigDecimal rbd = (RubyBigDecimal) o;
+ // wraps a Java BigDecimal so we can return the size of that:
+ return estimateMemory(rbd.getValue());
+ }
+ if (o instanceof RubyFixnum) {
+ // like an int value
+ return Integer.BYTES;
+ }
+ if (o instanceof RubyBoolean) {
+ return Byte.BYTES;
+ }
+ if (o instanceof RubyNil) {
+ return 8 + Integer.BYTES; // object reference, one int
+ }
+ if (o instanceof RubySymbol) {
+ return estimateMemory(((RubySymbol) o).asJavaString());
+ }
+ if (o instanceof RubyFloat) {
+ return Double.BYTES;
+ }
+
+ throw new IllegalArgumentException(
+ "Unsupported type encountered in estimateMemory: " + o.getClass().getName() +
+ ". Please ensure all objects passed to estimateMemory are of supported types. " +
+ "Refer to the ConvertedMap.estimateMemory method for the list of supported types."
+ );
+ }
}
diff --git a/logstash-core/src/main/java/org/logstash/Event.java b/logstash-core/src/main/java/org/logstash/Event.java
index e1e9f4db1f2..2a9f79ab9b5 100644
--- a/logstash-core/src/main/java/org/logstash/Event.java
+++ b/logstash-core/src/main/java/org/logstash/Event.java
@@ -529,6 +529,7 @@ private void initFailTag(final Object tag) {
* and needs to be converted to a list before appending to it.
* @param existing Existing Tag
* @param tag Tag to add
+ *
*/
private void scalarTagFallback(final String existing, final String tag) {
final List tags = new ArrayList<>(2);
@@ -567,4 +568,16 @@ private static String getCanonicalFieldReference(final FieldReference field) {
return path.stream().collect(Collectors.joining("][", "[", "]"));
}
}
+
+ /**
+ * @return a byte size estimation of the event, based on the payloads carried by nested data structures,
+ * without considering the space needed by the JVM to represent the object itself.
+ *
+ * */
+ public long estimateMemory() {
+ long total = 0;
+ total += data.estimateMemory();
+ total += metadata.estimateMemory();
+ return total;
+ }
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
index 6a10c2a3e7f..c44560ad7c6 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
@@ -46,6 +46,12 @@
@JRubyClass(name = "QueueFactory")
public final class QueueFactoryExt extends RubyBasicObject {
+ public enum BatchMetricMode {
+ DISABLED,
+ MINIMAL,
+ FULL
+ }
+
/**
* A static value to indicate Persistent Queue is enabled.
*/
@@ -71,6 +77,7 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString();
+ final BatchMetricMode batchMetricMode = decodeBatchMetricMode(context, settings);
if (PERSISTED_TYPE.equals(type)) {
final Settings queueSettings = extractQueueSettings(settings);
final Path queuePath = Paths.get(queueSettings.getDirPath());
@@ -81,18 +88,14 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
Files.createDirectories(queuePath);
}
- return JRubyWrappedAckedQueueExt.create(context, queueSettings);
+ return JRubyWrappedAckedQueueExt.create(context, queueSettings, batchMetricMode);
} else if (MEMORY_TYPE.equals(type)) {
- return new JrubyWrappedSynchronousQueueExt(
- context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
- ).initialize(
- context, context.runtime.newFixnum(
- getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE)
- .convertToInteger().getIntValue()
- * getSetting(context, settings, SettingKeyDefinitions.PIPELINE_WORKERS)
- .convertToInteger().getIntValue()
- )
- );
+ final int batchSize = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE)
+ .convertToInteger().getIntValue();
+ final int workers = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_WORKERS)
+ .convertToInteger().getIntValue();
+ int queueSize = batchSize * workers;
+ return JrubyWrappedSynchronousQueueExt.create(context, queueSize, batchMetricMode);
} else {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
@@ -104,6 +107,16 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
}
}
+ private static BatchMetricMode decodeBatchMetricMode(ThreadContext context, IRubyObject settings) {
+ final String batchMetricModeStr = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_METRICS)
+ .asJavaString();
+
+ if (batchMetricModeStr == null || batchMetricModeStr.isEmpty()) {
+ return BatchMetricMode.DISABLED;
+ }
+ return BatchMetricMode.valueOf(batchMetricModeStr.toUpperCase());
+ }
+
private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings,
final String name) {
return settings.callMethod(context, "get_value", context.runtime.newString(name));
@@ -115,6 +128,7 @@ private static Settings extractQueueSettings(final IRubyObject settings) {
getSetting(context, settings, PATH_QUEUE).asJavaString(),
getSetting(context, settings, PIPELINE_ID).asJavaString()
);
+
return SettingsImpl.fileSettingsBuilder(queuePath.toString())
.elementClass(Event.class)
.capacity(getSetting(context, settings, QUEUE_PAGE_CAPACITY).toJava(Integer.class))
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
index 1623738659e..212945ef07f 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
@@ -90,6 +90,5 @@ interface Builder {
Builder checkpointRetry(boolean checkpointRetry);
Settings build();
-
}
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java
index 35cb765dbc9..770824935ce 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java
@@ -44,7 +44,7 @@
import org.logstash.ackedqueue.SettingsImpl;
/**
- * JRuby extension to wrap a persistent queue istance.
+ * JRuby extension to wrap a persistent queue instance.
*/
@JRubyClass(name = "AckedQueue")
public final class JRubyAckedQueueExt extends RubyObject {
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
index d2d374b56f7..42d11da8875 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
@@ -21,6 +21,7 @@
package org.logstash.ackedqueue.ext;
import java.io.IOException;
+import java.util.Objects;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
@@ -32,6 +33,7 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ackedqueue.Settings;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
@@ -48,9 +50,10 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
private static final long serialVersionUID = 1L;
private JRubyAckedQueueExt queue;
+ private QueueFactoryExt.BatchMetricMode batchMetricMode;
- @JRubyMethod(required=1)
- public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject settings) throws IOException {
+ @JRubyMethod(required=2)
+ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject settings, IRubyObject batchMetricMode) throws IOException {
if (!JavaUtil.isJavaObject(settings)) {
// We should never get here, but previously had an initialize method
// that took 7 technically-optional ordered parameters.
@@ -60,18 +63,33 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject s
settings.getClass().getName(),
settings));
}
- this.queue = JRubyAckedQueueExt.create(JavaUtil.unwrapJavaObject(settings));
+
+ Objects.requireNonNull(batchMetricMode, "batchMetricMode setting must be non-null");
+ if (!JavaUtil.isJavaObject(batchMetricMode)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to instantiate JRubyWrappedAckedQueueExt with <%s:%s>",
+ batchMetricMode.getClass().getName(),
+ batchMetricMode));
+ }
+
+
+ Settings javaSettings = JavaUtil.unwrapJavaObject(settings);
+ this.queue = JRubyAckedQueueExt.create(javaSettings);
+
+ this.batchMetricMode = JavaUtil.unwrapJavaObject(batchMetricMode);
this.queue.open();
return this;
}
- public static JRubyWrappedAckedQueueExt create(ThreadContext context, Settings settings) throws IOException {
- return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS, settings);
+ public static JRubyWrappedAckedQueueExt create(ThreadContext context, Settings settings, QueueFactoryExt.BatchMetricMode batchMetricMode) throws IOException {
+ return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS, settings, batchMetricMode);
}
- public JRubyWrappedAckedQueueExt(Ruby runtime, RubyClass metaClass, Settings settings) throws IOException {
+ public JRubyWrappedAckedQueueExt(Ruby runtime, RubyClass metaClass, Settings settings, QueueFactoryExt.BatchMetricMode batchMetricMode) throws IOException {
super(runtime, metaClass);
+ this.batchMetricMode = Objects.requireNonNull(batchMetricMode, "batchMetricMode setting must be non-null");
this.queue = JRubyAckedQueueExt.create(settings);
this.queue.open();
}
@@ -111,7 +129,7 @@ protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext co
@Override
protected QueueReadClientBase getReadClient() {
- return JrubyAckedReadClientExt.create(queue);
+ return JrubyAckedReadClientExt.create(queue, batchMetricMode);
}
@Override
diff --git a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
index 7c5c47e3162..1e9eabf0a61 100644
--- a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
+++ b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
@@ -28,6 +28,8 @@ public class SettingKeyDefinitions {
public static final String PIPELINE_WORKERS = "pipeline.workers";
+ public static final String PIPELINE_BATCH_METRICS = "pipeline.batch.metrics.sampling_mode";
+
public static final String PIPELINE_BATCH_SIZE = "pipeline.batch.size";
public static final String PATH_QUEUE = "path.queue";
diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java
index ec26b9b0735..ef12af32cd6 100644
--- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java
+++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java
@@ -317,8 +317,7 @@ public final IRubyObject openQueue(final ThreadContext context) {
new IRubyObject[]{
STATS_KEY,
PIPELINES_KEY,
- pipelineId.convertToString().intern(),
- EVENTS_KEY
+ pipelineId.convertToString().intern()
}
)
)
@@ -585,11 +584,39 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) {
this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, utilizationFlow);
storeMetric(context, flowNamespace, utilizationFlow);
+ // Batch average byte size and count metrics
+ if (isBatchMetricsEnabled(context)) {
+ initializeBatchMetrics(context);
+ }
+
initializePqFlowMetrics(context, flowNamespace, uptimeMetric);
initializePluginFlowMetrics(context, uptimeMetric);
return context.nil;
}
+ private void initializeBatchMetrics(ThreadContext context) {
+ final RubySymbol[] batchNamespace = buildNamespace(BATCH_KEY, BATCH_EVENT_COUNT_KEY);
+ final LongCounter batchEventsInCounter = initOrGetCounterMetric(context, buildNamespace(BATCH_KEY), BATCH_TOTAL_EVENTS);
+ final LongCounter batchCounter = initOrGetCounterMetric(context, buildNamespace(BATCH_KEY), BATCH_COUNT);
+ final FlowMetric documentsPerBatch = createFlowMetric(BATCH_AVERAGE_KEY, batchEventsInCounter, batchCounter);
+ this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, documentsPerBatch);
+ storeMetric(context, batchNamespace, documentsPerBatch);
+
+ final RubySymbol[] batchSizeNamespace = buildNamespace(BATCH_KEY, BATCH_BYTE_SIZE_KEY);
+ final LongCounter totalBytes = initOrGetCounterMetric(context, buildNamespace(BATCH_KEY), BATCH_TOTAL_BYTES);
+ final FlowMetric byteSizePerBatch = createFlowMetric(BATCH_AVERAGE_KEY, totalBytes, batchCounter);
+ this.scopedFlowMetrics.register(ScopedFlowMetrics.Scope.WORKER, byteSizePerBatch);
+ storeMetric(context, batchSizeNamespace, byteSizePerBatch);
+ }
+
+ private boolean isBatchMetricsEnabled(ThreadContext context) {
+ IRubyObject pipelineBatchMetricsSetting = getSetting(context, "pipeline.batch.metrics.sampling_mode");
+ return !pipelineBatchMetricsSetting.isNil() &&
+ QueueFactoryExt.BatchMetricMode.valueOf(
+ pipelineBatchMetricsSetting.asJavaString().toUpperCase()
+ ) != QueueFactoryExt.BatchMetricMode.DISABLED;
+ }
+
@JRubyMethod(name = "collect_flow_metrics")
public final IRubyObject collectFlowMetrics(final ThreadContext context) {
this.scopedFlowMetrics.captureAll();
diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java
index 535cd838a0e..aba1377ca5d 100644
--- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java
+++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java
@@ -32,15 +32,19 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import java.io.IOException;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import static org.logstash.instrument.metrics.MetricKeys.EVENTS_KEY;
+
/**
* Common code shared by Persistent and In-Memory queues clients implementation
* */
@@ -60,9 +64,17 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
private transient LongCounter pipelineMetricOut;
private transient LongCounter pipelineMetricFiltered;
private transient TimerMetric pipelineMetricTime;
+ private final transient QueueReadClientBatchMetrics batchMetrics;
protected QueueReadClientBase(final Ruby runtime, final RubyClass metaClass) {
+ this(runtime, metaClass, QueueFactoryExt.BatchMetricMode.DISABLED);
+ }
+
+ protected QueueReadClientBase(final Ruby runtime, final RubyClass metaClass,
+ final QueueFactoryExt.BatchMetricMode batchMetricMode) {
super(runtime, metaClass);
+ Objects.requireNonNull(batchMetricMode, "batchMetricMode must not be null");
+ this.batchMetrics = new QueueReadClientBatchMetrics(batchMetricMode);
}
@JRubyMethod(name = "inflight_batches")
@@ -86,10 +98,13 @@ public IRubyObject setEventsMetric(final IRubyObject metric) {
@JRubyMethod(name = "set_pipeline_metric")
public IRubyObject setPipelineMetric(final IRubyObject metric) {
final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) metric;
+ ThreadContext context = metric.getRuntime().getCurrentContext();
+ AbstractNamespacedMetricExt eventsNamespace = namespacedMetric.namespace(context, EVENTS_KEY);
synchronized(namespacedMetric.getMetric()) {
- pipelineMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
- pipelineMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
- pipelineMetricTime = TimerMetric.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
+ pipelineMetricOut = LongCounter.fromRubyBase(eventsNamespace, MetricKeys.OUT_KEY);
+ pipelineMetricFiltered = LongCounter.fromRubyBase(eventsNamespace, MetricKeys.FILTERED_KEY);
+ pipelineMetricTime = TimerMetric.fromRubyBase(eventsNamespace, MetricKeys.DURATION_IN_MILLIS_KEY);
+ batchMetrics.setupMetrics(namespacedMetric);
}
return this;
}
@@ -193,6 +208,7 @@ public void startMetrics(QueueBatch batch) {
// JTODO getId has been deprecated in JDK 19, when JDK 21 is the target version use threadId() instead
long threadId = Thread.currentThread().getId();
inflightBatches.put(threadId, batch);
+ batchMetrics.updateBatchMetrics(batch);
}
@Override
diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java
new file mode 100644
index 00000000000..3ed58daaef2
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java
@@ -0,0 +1,76 @@
+package org.logstash.execution;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jruby.runtime.ThreadContext;
+import org.logstash.ackedqueue.QueueFactoryExt;
+import org.logstash.ext.JrubyEventExtLibrary;
+import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
+import org.logstash.instrument.metrics.counter.LongCounter;
+
+import java.security.SecureRandom;
+
+import static org.logstash.instrument.metrics.MetricKeys.*;
+
+class QueueReadClientBatchMetrics {
+
+ private static final Logger LOG = LogManager.getLogger(QueueReadClientBatchMetrics.class);
+
+ private final QueueFactoryExt.BatchMetricMode batchMetricMode;
+
+ private LongCounter pipelineMetricBatchCount;
+ private LongCounter pipelineMetricBatchByteSize;
+ private LongCounter pipelineMetricBatchTotalEvents;
+ private final SecureRandom random = new SecureRandom();
+
+ public QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode batchMetricMode) {
+ this.batchMetricMode = batchMetricMode;
+ }
+
+ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
+ LOG.debug("setupMetrics called with mode: {}", batchMetricMode);
+ ThreadContext context = namespacedMetric.getRuntime().getCurrentContext();
+ AbstractNamespacedMetricExt batchNamespace = namespacedMetric.namespace(context, BATCH_KEY);
+ if (batchMetricMode != QueueFactoryExt.BatchMetricMode.DISABLED) {
+ pipelineMetricBatchCount = LongCounter.fromRubyBase(batchNamespace, BATCH_COUNT);
+ pipelineMetricBatchTotalEvents = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_EVENTS);
+ pipelineMetricBatchByteSize = LongCounter.fromRubyBase(batchNamespace, BATCH_TOTAL_BYTES);
+ }
+ }
+
+ public void updateBatchMetrics(QueueBatch batch) {
+ if (batch.events().isEmpty()) {
+ // avoid to increment batch count for empty batches
+ return;
+ }
+
+ if (batchMetricMode == QueueFactoryExt.BatchMetricMode.DISABLED) {
+ return;
+ }
+
+ boolean updateMetric = true;
+ if (batchMetricMode == QueueFactoryExt.BatchMetricMode.MINIMAL) {
+ // 1% chance to update metric
+ updateMetric = random.nextInt(100) < 2;
+ }
+
+ if (updateMetric) {
+ updateBatchSizeMetric(batch);
+ }
+ }
+
+ private void updateBatchSizeMetric(QueueBatch batch) {
+ try {
+ // if an error occurs in estimating the size of the batch, no counter has to be updated
+ long totalSize = 0L;
+ for (JrubyEventExtLibrary.RubyEvent rubyEvent : batch.events()) {
+ totalSize += rubyEvent.getEvent().estimateMemory();
+ }
+ pipelineMetricBatchCount.increment();
+ pipelineMetricBatchTotalEvents.increment(batch.filteredSize());
+ pipelineMetricBatchByteSize.increment(totalSize);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to calculate batch byte size for metrics", e);
+ }
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedReadClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedReadClientExt.java
index 273fa6cb2b2..3f4f1b62a44 100644
--- a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedReadClientExt.java
+++ b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedReadClientExt.java
@@ -28,6 +28,7 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ackedqueue.AckedReadBatch;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.execution.QueueBatch;
import org.logstash.execution.QueueReadClient;
@@ -49,12 +50,12 @@ public final class JrubyAckedReadClientExt extends QueueReadClientBase implement
public static JrubyAckedReadClientExt create(final ThreadContext context,
final IRubyObject recv, final IRubyObject queue) {
return new JrubyAckedReadClientExt(
- context.runtime, RubyUtil.ACKED_READ_CLIENT_CLASS, queue
+ context.runtime, RubyUtil.ACKED_READ_CLIENT_CLASS, queue, QueueFactoryExt.BatchMetricMode.DISABLED
);
}
- public static JrubyAckedReadClientExt create(IRubyObject queue) {
- return new JrubyAckedReadClientExt(RubyUtil.RUBY, RubyUtil.ACKED_READ_CLIENT_CLASS, queue);
+ public static JrubyAckedReadClientExt create(IRubyObject queue, QueueFactoryExt.BatchMetricMode batchMetricMode) {
+ return new JrubyAckedReadClientExt(RubyUtil.RUBY, RubyUtil.ACKED_READ_CLIENT_CLASS, queue, batchMetricMode);
}
public JrubyAckedReadClientExt(final Ruby runtime, final RubyClass metaClass) {
@@ -62,8 +63,8 @@ public JrubyAckedReadClientExt(final Ruby runtime, final RubyClass metaClass) {
}
private JrubyAckedReadClientExt(final Ruby runtime, final RubyClass metaClass,
- final IRubyObject queue) {
- super(runtime, metaClass);
+ final IRubyObject queue, final QueueFactoryExt.BatchMetricMode batchMetricMode) {
+ super(runtime, metaClass, batchMetricMode);
this.queue = (JRubyAckedQueueExt)queue;
}
diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java
index 0a93a347c4d..8485d0864d1 100644
--- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java
+++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java
@@ -26,6 +26,7 @@
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.common.LsQueueUtils;
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
@@ -47,8 +48,9 @@ public JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass) {
@SuppressWarnings("rawtypes")
private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
- BlockingQueue queue, int batchSize, int waitForMillis) {
- super(runtime, metaClass);
+ BlockingQueue queue, int batchSize, int waitForMillis,
+ QueueFactoryExt.BatchMetricMode batchMetricMode) {
+ super(runtime, metaClass, batchMetricMode);
this.queue = queue;
this.batchSize = batchSize;
this.waitForNanos = TimeUnit.NANOSECONDS.convert(waitForMillis, TimeUnit.MILLISECONDS);
@@ -58,8 +60,15 @@ private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
@SuppressWarnings("rawtypes")
public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize,
int waitForMillis) {
+ return create(queue, batchSize, waitForMillis, QueueFactoryExt.BatchMetricMode.DISABLED);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize,
+ int waitForMillis,
+ QueueFactoryExt.BatchMetricMode batchMetricMode) {
return new JrubyMemoryReadClientExt(RubyUtil.RUBY,
- RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis);
+ RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis, batchMetricMode);
}
@Override
diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java
index dbbfb97de5b..ebebae00cbe 100644
--- a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java
@@ -20,6 +20,7 @@
package org.logstash.ext;
+import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -28,8 +29,11 @@
import org.jruby.RubyNumeric;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
+import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.QueueReadClientBase;
@@ -42,20 +46,44 @@ public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueE
private static final long serialVersionUID = 1L;
private transient BlockingQueue queue;
+ private QueueFactoryExt.BatchMetricMode batchMetricMode;
public JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
+ private JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaClass,
+ int size, QueueFactoryExt.BatchMetricMode batchMetricMode) {
+ super(runtime, metaClass);
+ this.queue = new ArrayBlockingQueue<>(size);
+ this.batchMetricMode = Objects.requireNonNull(batchMetricMode, "batchMetricMode setting must be non-null");
+ }
+
@JRubyMethod
@SuppressWarnings("unchecked")
public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
- IRubyObject size) {
+ IRubyObject size,
+ IRubyObject batchMetricMode) {
+ if (!JavaUtil.isJavaObject(batchMetricMode)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to instantiate JrubyWrappedSynchronousQueueExt with <%s:%s>",
+ batchMetricMode.getClass().getName(),
+ batchMetricMode));
+ }
+
int typedSize = ((RubyNumeric)size).getIntValue();
this.queue = new ArrayBlockingQueue<>(typedSize);
+ Objects.requireNonNull(batchMetricMode, "batchMetricMode setting must be non-null");
+ this.batchMetricMode = JavaUtil.unwrapJavaObject(batchMetricMode);
return this;
}
+ public static JrubyWrappedSynchronousQueueExt create(final ThreadContext context, int size,
+ QueueFactoryExt.BatchMetricMode batchMetricMode) {
+ return new JrubyWrappedSynchronousQueueExt(context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS, size, batchMetricMode);
+ }
+
@Override
protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) {
return JrubyMemoryWriteClientExt.create(queue);
@@ -65,7 +93,7 @@ protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext co
protected QueueReadClientBase getReadClient() {
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
// to be reasonable tradeoffs between latency and throughput per PR #8707
- return JrubyMemoryReadClientExt.create(queue, 125, 50);
+ return JrubyMemoryReadClientExt.create(queue, 125, 50, batchMetricMode);
}
@Override
diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java
index 2730ab83749..15f540b3dca 100644
--- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java
+++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java
@@ -116,4 +116,19 @@ private MetricKeys() {
public static final RubySymbol WRITES_IN_KEY = RubyUtil.RUBY.newSymbol("writes_in");
+ // Batch metrics keys
+ public static final RubySymbol BATCH_EVENT_COUNT_KEY = RubyUtil.RUBY.newSymbol("event_count");
+
+ public static final RubySymbol BATCH_AVERAGE_KEY = RubyUtil.RUBY.newSymbol("average");
+
+ public static final RubySymbol BATCH_KEY = RubyUtil.RUBY.newSymbol("batch");
+
+ public static final RubySymbol BATCH_COUNT = RubyUtil.RUBY.newSymbol("count");
+
+ public static final RubySymbol BATCH_TOTAL_EVENTS = RubyUtil.RUBY.newSymbol("total_events");
+
+ public static final RubySymbol BATCH_TOTAL_BYTES = RubyUtil.RUBY.newSymbol("total_bytes");
+
+ public static final RubySymbol BATCH_BYTE_SIZE_KEY = RubyUtil.RUBY.newSymbol("byte_size");
+
}
diff --git a/logstash-core/src/test/java/org/logstash/execution/QueueReadClientBatchMetricsTest.java b/logstash-core/src/test/java/org/logstash/execution/QueueReadClientBatchMetricsTest.java
new file mode 100644
index 00000000000..588064e41af
--- /dev/null
+++ b/logstash-core/src/test/java/org/logstash/execution/QueueReadClientBatchMetricsTest.java
@@ -0,0 +1,133 @@
+package org.logstash.execution;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import org.jruby.RubyArray;
+import org.jruby.runtime.ThreadContext;
+import org.jruby.runtime.builtin.IRubyObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.logstash.Event;
+import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.QueueFactoryExt;
+import org.logstash.ext.JrubyEventExtLibrary;
+import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
+import org.logstash.instrument.metrics.MetricKeys;
+import org.logstash.instrument.metrics.MockNamespacedMetric;
+import org.logstash.instrument.metrics.counter.LongCounter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class QueueReadClientBatchMetricsTest {
+
+ public static final class MockQueueBatch implements QueueBatch {
+
+ private final long processingTimeNanos;
+ private final List events;
+
+ public MockQueueBatch(long processingTimeNanos, JrubyEventExtLibrary.RubyEvent... events) {
+ this.processingTimeNanos = processingTimeNanos;
+ this.events = Arrays.stream(events).toList();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RubyArray to_a() {
+ List list = new ArrayList<>(events);
+ return (RubyArray) RubyUtil.RUBY.newArray(list);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection events() {
+ return to_a();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public int filteredSize() {
+ return events.size();
+ }
+
+ public long getProcessingTimeNanos() {
+ return processingTimeNanos;
+ }
+ }
+
+ private AbstractNamespacedMetricExt metric;
+ private QueueReadClientBatchMetrics sut;
+ private LongCounter batchCounter;
+ private LongCounter batchByteSizeCounter;
+ private JrubyEventExtLibrary.RubyEvent rubyEvent;
+
+ @Before
+ public void setUp() {
+ metric = MockNamespacedMetric.create();
+ sut = new QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode.FULL);
+ sut.setupMetrics(metric);
+
+ ThreadContext context = metric.getRuntime().getCurrentContext();
+ batchCounter = LongCounter.fromRubyBase(metric.namespace(context, MetricKeys.BATCH_KEY), MetricKeys.BATCH_COUNT);
+ batchByteSizeCounter = LongCounter.fromRubyBase(metric.namespace(context, MetricKeys.BATCH_KEY), MetricKeys.BATCH_TOTAL_BYTES);
+
+ rubyEvent = JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
+ }
+
+ @Test
+ public void givenEmptyBatchAndFullMetricsWhenUpdateBatchMetricsThenNoMetricsAreUpdated() {
+ QueueBatch emptyBatch = new MockQueueBatch(10);
+
+ sut.updateBatchMetrics(emptyBatch);
+
+ assertEquals(0L, batchCounter.getValue().longValue());
+ }
+
+ @Test
+ public void givenNonEmptyBatchAndFullMetricsWhenUpdateBatchMetricsThenMetricsAreUpdated() {
+ QueueBatch batch = new MockQueueBatch(10, rubyEvent);
+ final long expectedBatchByteSize = rubyEvent.getEvent().estimateMemory();
+
+ sut.updateBatchMetrics(batch);
+
+ assertEquals(1L, batchCounter.getValue().longValue());
+ assertEquals(expectedBatchByteSize, batchByteSizeCounter.getValue().longValue());
+ }
+
+ @Test
+ public void givenNonEmptyBatchesAndMinimalMetricsThenMetricsAreUpdated() {
+ sut = new QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode.MINIMAL);
+ sut.setupMetrics(metric);
+
+ QueueBatch batch = new MockQueueBatch(10, rubyEvent);
+ final long expectedBatchByteSize = rubyEvent.getEvent().estimateMemory();
+
+ for (int i = 0; i < 200; i++) {
+ sut.updateBatchMetrics(batch);
+ }
+ sut.updateBatchMetrics(batch);
+
+ assertThat(batchCounter.getValue(), org.hamcrest.Matchers.greaterThan(1L));
+ assertThat(batchByteSizeCounter.getValue(), org.hamcrest.Matchers.greaterThan(expectedBatchByteSize));
+ }
+
+ @Test
+ public void givenNonEmptyQueueWhenBatchIsReadAndMetricIsDisabledThenBatchCounterMetricIsNotUpdated() {
+ sut = new QueueReadClientBatchMetrics(QueueFactoryExt.BatchMetricMode.DISABLED);
+ sut.setupMetrics(metric);
+ QueueBatch batch = new MockQueueBatch(10, rubyEvent);
+
+ sut.updateBatchMetrics(batch);
+
+ assertEquals(0L, batchCounter.getValue().longValue());
+ assertEquals(0L, batchByteSizeCounter.getValue().longValue());
+ }
+}
\ No newline at end of file
diff --git a/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java b/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java
index dff3c4845a3..386aa628f2b 100644
--- a/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java
+++ b/logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java
@@ -25,26 +25,50 @@
import java.util.concurrent.BlockingQueue;
import org.jruby.RubyHash;
import org.jruby.runtime.ThreadContext;
+import org.junit.Before;
import org.junit.Test;
+import org.logstash.Event;
import org.logstash.RubyTestBase;
+import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.execution.QueueBatch;
+import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
+import org.logstash.instrument.metrics.MetricKeys;
+import org.logstash.instrument.metrics.MockNamespacedMetric;
+import org.logstash.instrument.metrics.counter.LongCounter;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
/**
* Tests for {@link JrubyMemoryReadClientExt}.
*/
public final class JrubyMemoryReadClientExtTest extends RubyTestBase {
+ private JrubyEventExtLibrary.RubyEvent testEvent;
+ private BlockingQueue queue;
+ private AbstractNamespacedMetricExt metric;
+ private LongCounter batchCounter;
+ private LongCounter batchByteSizeCounter;
+
+ @Before
+ public void setUp() {
+ testEvent = JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
+ queue = new ArrayBlockingQueue<>(10);
+ metric = MockNamespacedMetric.create();
+ ThreadContext context = metric.getRuntime().getCurrentContext();
+ batchCounter = LongCounter.fromRubyBase(metric.namespace(context, MetricKeys.BATCH_KEY), MetricKeys.BATCH_COUNT);
+ batchByteSizeCounter = LongCounter.fromRubyBase(metric.namespace(context, MetricKeys.BATCH_KEY), MetricKeys.BATCH_TOTAL_BYTES);
+ }
+
@Test
@SuppressWarnings("deprecation")
public void testInflightBatchesTracking() throws InterruptedException, IOException {
- final BlockingQueue queue =
- new ArrayBlockingQueue<>(10);
final JrubyMemoryReadClientExt client =
JrubyMemoryReadClientExt.create(queue, 5, 50);
final ThreadContext context = client.getRuntime().getCurrentContext();
+ client.setPipelineMetric(metric);
final QueueBatch batch = client.readBatch();
final RubyHash inflight = client.rubyGetInflightBatches(context);
assertThat(inflight.size(), is(1));
@@ -53,4 +77,55 @@ public void testInflightBatchesTracking() throws InterruptedException, IOExcepti
client.closeBatch(batch);
assertThat(client.rubyGetInflightBatches(context).size(), is(0));
}
+
+ @Test
+ public void givenNonEmptyQueueWhenBatchIsReadThenBatchCounterMetricIsUpdated() throws InterruptedException {
+ queue.add(testEvent);
+
+ final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50,
+ QueueFactoryExt.BatchMetricMode.FULL);
+ client.setPipelineMetric(metric);
+
+ final QueueBatch batch = client.readBatch();
+ assertEquals(1, batch.filteredSize());
+ assertEquals(1L, batchCounter.getValue().longValue());
+ }
+
+ @Test
+ public void givenNonEmptyQueueWhenBatchIsReadAndMetricIsDisabledThenBatchCounterMetricIsNotUpdated() throws InterruptedException {
+ queue.add(testEvent);
+
+ final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50,
+ QueueFactoryExt.BatchMetricMode.DISABLED);
+ client.setPipelineMetric(metric);
+
+ final QueueBatch batch = client.readBatch();
+ assertEquals(1, batch.filteredSize());
+ assertEquals(0L, batchCounter.getValue().longValue());
+ }
+
+ @Test
+ public void givenEmptyQueueWhenEmptyBatchIsReadAndMetricIsFullyCollectedThenBatchCounterMetricIsNotUpdated() throws InterruptedException {
+ final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50,
+ QueueFactoryExt.BatchMetricMode.FULL);
+ client.setPipelineMetric(metric);
+
+ final QueueBatch batch = client.readBatch();
+ assertEquals(0, batch.filteredSize());
+ assertEquals(0L, batchCounter.getValue().longValue());
+ }
+
+ @Test
+ public void givenNonEmptyQueueWhenBatchIsReadThenBatchByteSizeMetricIsUpdated() throws InterruptedException {
+ final long expectedBatchByteSize = testEvent.getEvent().estimateMemory();
+ queue.add(testEvent);
+
+ final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50,
+ QueueFactoryExt.BatchMetricMode.FULL);
+ client.setPipelineMetric(metric);
+
+ final QueueBatch batch = client.readBatch();
+ assertEquals(1, batch.filteredSize());
+ assertEquals(expectedBatchByteSize, batchByteSizeCounter.getValue().longValue());
+ }
}
diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java
new file mode 100644
index 00000000000..91ea0e49d01
--- /dev/null
+++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/MockNamespacedMetric.java
@@ -0,0 +1,101 @@
+package org.logstash.instrument.metrics;
+
+import org.jruby.Ruby;
+import org.jruby.RubyArray;
+import org.jruby.RubyClass;
+import org.jruby.RubySymbol;
+import org.jruby.runtime.Block;
+import org.jruby.runtime.ThreadContext;
+import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.RubyUtil;
+import org.logstash.instrument.metrics.counter.LongCounter;
+import org.logstash.instrument.metrics.timer.TimerMetric;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Trivial implementation of AbstractNamespacedMetricExt where each abstract creation
+ * metric is implemented by pooling metric instances by name.
+ * */
+@SuppressWarnings({"rawtypes", "serializable"})
+public class MockNamespacedMetric extends AbstractNamespacedMetricExt {
+
+ private static final long serialVersionUID = -6507123659910450215L;
+
+ private transient final ConcurrentMap metrics = new ConcurrentHashMap<>();
+
+ public static MockNamespacedMetric create() {
+ return new MockNamespacedMetric(RubyUtil.RUBY, RubyUtil.NAMESPACED_METRIC_CLASS);
+ }
+
+ MockNamespacedMetric(final Ruby runtime, final RubyClass metaClass) {
+ super(runtime, metaClass);
+ }
+
+ @Override
+ protected IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value) {
+ return null;
+ }
+
+ @Override
+ protected RubyArray getNamespaceName(ThreadContext context) {
+ return null;
+ }
+
+ @Override
+ protected IRubyObject getCounter(ThreadContext context, IRubyObject key) {
+ Objects.requireNonNull(key);
+ requireRubySymbol(key, "key");
+ return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), LongCounter::new));
+ }
+
+ @Override
+ protected IRubyObject getTimer(ThreadContext context, IRubyObject key) {
+ Objects.requireNonNull(key);
+ requireRubySymbol(key, "key");
+ return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), TimerMetric::create));
+ }
+
+ @Override
+ protected IRubyObject doTime(ThreadContext context, IRubyObject key, Block block) {
+ return null;
+ }
+
+ @Override
+ protected IRubyObject doReportTime(ThreadContext context, IRubyObject key, IRubyObject duration) {
+ return null;
+ }
+
+ @Override
+ protected IRubyObject doIncrement(ThreadContext context, IRubyObject[] args) {
+ return null;
+ }
+
+ @Override
+ protected IRubyObject doDecrement(ThreadContext context, IRubyObject[] args) {
+ return null;
+ }
+
+ @Override
+ public AbstractMetricExt getMetric() {
+ return NullMetricExt.create();
+ }
+
+ @Override
+ protected AbstractNamespacedMetricExt createNamespaced(ThreadContext context, IRubyObject name) {
+ return this;
+ }
+
+ @Override
+ protected IRubyObject getCollector(ThreadContext context) {
+ return null;
+ }
+
+ private static void requireRubySymbol(IRubyObject value, String paramName) {
+ if (!(value instanceof RubySymbol)) {
+ throw new IllegalArgumentException(paramName + " must be a RubySymbol instead was: " + value.getClass());
+ }
+ }
+}
\ No newline at end of file
diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb
index 19dabc16f0d..a77c113ca78 100644
--- a/qa/integration/specs/monitoring_api_spec.rb
+++ b/qa/integration/specs/monitoring_api_spec.rb
@@ -240,6 +240,70 @@
end
end
+ context "when pipeline.batch.metrics.sampling_mode is set to 'full'" do
+ let(:settings_overrides) do
+ super().merge({'pipeline.batch.metrics.sampling_mode' => 'full'})
+ end
+
+ it "can retrieve batch stats" do
+ logstash_service.start_with_stdin
+ logstash_service.wait_for_logstash
+
+ number_of_events.times {
+ logstash_service.write_to_stdin("Testing flow metrics")
+ sleep(1)
+ }
+
+ Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
+ # node_stats can fail if the stats subsystem isn't ready
+ result = logstash_service.monitoring_api.node_stats rescue nil
+ expect(result).not_to be_nil
+ # we use fetch here since we want failed fetches to raise an exception
+ # and trigger the retry block
+ batch_stats = result.fetch("pipelines").fetch(pipeline_id).fetch("batch")
+ expect(batch_stats).not_to be_nil
+
+ expect(batch_stats["event_count"]).not_to be_nil
+ expect(batch_stats["event_count"]["average"]).not_to be_nil
+ expect(batch_stats["event_count"]["average"]["lifetime"]).not_to be_nil
+ expect(batch_stats["event_count"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
+ expect(batch_stats["event_count"]["average"]["lifetime"]).to be > 0
+
+ expect(batch_stats["byte_size"]).not_to be_nil
+ expect(batch_stats["byte_size"]["average"]).not_to be_nil
+ expect(batch_stats["byte_size"]["average"]["lifetime"]).not_to be_nil
+ expect(batch_stats["byte_size"]["average"]["lifetime"]).to be_a_kind_of(Numeric)
+ expect(batch_stats["byte_size"]["average"]["lifetime"]).to be > 0
+ end
+ end
+ end
+
+ context "when pipeline.batch.metrics.sampling_mode is set to 'disabled'" do
+ let(:settings_overrides) do
+ super().merge({'pipeline.batch.metrics.sampling_mode' => 'disabled'})
+ end
+
+ it "no batch stats metrics are available" do
+ logstash_service.start_with_stdin
+ logstash_service.wait_for_logstash
+
+ number_of_events.times {
+ logstash_service.write_to_stdin("Testing flow metrics")
+ sleep(1)
+ }
+
+ Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
+ # node_stats can fail if the stats subsystem isn't ready
+ result = logstash_service.monitoring_api.node_stats rescue nil
+ expect(result).not_to be_nil
+ # we use fetch here since we want failed fetches to raise an exception
+ # and trigger the retry block
+ pipeline_stats = result.fetch("pipelines").fetch(pipeline_id)
+ expect(pipeline_stats).not_to include("batch")
+ end
+ end
+ end
+
it "retrieves the pipeline flow statuses" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin