Skip to content

Commit 1e27899

Browse files
committed
Decoded 'pipeline.batch.metrics' setting into BatchSizeSamplingType and spread around to reach in memory queue client and control the batch size metrics. Covered with tests the readBatch code to verify the effectiveness of the flag
1 parent f34e8b4 commit 1e27899

File tree

4 files changed

+53
-8
lines changed

4 files changed

+53
-8
lines changed

logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@
4848
@JRubyClass(name = "QueueReadClientBase")
4949
public abstract class QueueReadClientBase extends RubyObject implements QueueReadClient {
5050

51+
public enum BatchSizeSamplingType {
52+
NONE, FULL;
53+
54+
public static BatchSizeSamplingType decode(String type) {
55+
return switch (type) {
56+
case "false" -> NONE;
57+
case "true" -> FULL;
58+
default -> throw new IllegalArgumentException("Invalid batch size type: " + type);
59+
};
60+
}
61+
}
62+
5163
private static final long serialVersionUID = 1L;
5264

5365
protected int batchSize = 125;
@@ -62,6 +74,7 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
6274
private transient LongCounter pipelineMetricFiltered;
6375
private transient TimerMetric pipelineMetricTime;
6476
private transient HistogramMetric pipelineMetricBatch;
77+
protected BatchSizeSamplingType batchSizeSamplingType = BatchSizeSamplingType.FULL;
6578

6679
protected QueueReadClientBase(final Ruby runtime, final RubyClass metaClass) {
6780
super(runtime, metaClass);
@@ -196,7 +209,9 @@ public void startMetrics(QueueBatch batch) {
196209
// JTODO getId has been deprecated in JDK 19, when JDK 21 is the target version use threadId() instead
197210
long threadId = Thread.currentThread().getId();
198211
inflightBatches.put(threadId, batch);
199-
pipelineMetricBatch.update(batch.filteredSize());
212+
if (batchSizeSamplingType == BatchSizeSamplingType.FULL) {
213+
pipelineMetricBatch.update(batch.filteredSize());
214+
}
200215
}
201216

202217
@Override

logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@ public JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass) {
4747

4848
@SuppressWarnings("rawtypes")
4949
private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
50-
BlockingQueue queue, int batchSize, int waitForMillis) {
50+
BlockingQueue queue, int batchSize, int waitForMillis,
51+
BatchSizeSamplingType batchSizeSamplingType) {
5152
super(runtime, metaClass);
5253
this.queue = queue;
54+
this.batchSizeSamplingType = batchSizeSamplingType;
5355
this.batchSize = batchSize;
5456
this.waitForNanos = TimeUnit.NANOSECONDS.convert(waitForMillis, TimeUnit.MILLISECONDS);
5557
this.waitForMillis = waitForMillis;
@@ -58,8 +60,14 @@ private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
5860
@SuppressWarnings("rawtypes")
5961
public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize,
6062
int waitForMillis) {
63+
return create(queue, batchSize, waitForMillis, BatchSizeSamplingType.FULL);
64+
}
65+
66+
@SuppressWarnings("rawtypes")
67+
public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize, int waitForMillis,
68+
BatchSizeSamplingType batchSizeSamplingType) {
6169
return new JrubyMemoryReadClientExt(RubyUtil.RUBY,
62-
RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis);
70+
RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis, batchSizeSamplingType);
6371
}
6472

6573
@Override

logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.jruby.Ruby;
2727
import org.jruby.RubyClass;
2828
import org.jruby.RubyNumeric;
29-
import org.jruby.RubyString;
3029
import org.jruby.anno.JRubyClass;
3130
import org.jruby.anno.JRubyMethod;
3231
import org.jruby.runtime.ThreadContext;
@@ -43,6 +42,7 @@ public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueE
4342
private static final long serialVersionUID = 1L;
4443

4544
private transient BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
45+
private QueueReadClientBase.BatchSizeSamplingType batchMetricsSamplingType;
4646

4747
public JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaClass) {
4848
super(runtime, metaClass);
@@ -54,7 +54,8 @@ public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
5454
IRubyObject size,
5555
IRubyObject batchMetricsSampling) {
5656
int typedSize = ((RubyNumeric)size).getIntValue();
57-
String batchMetricsSamplingType = ((RubyString) batchMetricsSampling).asJavaString();
57+
this.batchMetricsSamplingType = QueueReadClientBase.BatchSizeSamplingType.decode(batchMetricsSampling.asJavaString());
58+
5859
this.queue = new ArrayBlockingQueue<>(typedSize);
5960
return this;
6061
}
@@ -68,7 +69,7 @@ protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext co
6869
protected QueueReadClientBase getReadClient() {
6970
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
7071
// to be reasonable tradeoffs between latency and throughput per PR #8707
71-
return JrubyMemoryReadClientExt.create(queue, 125, 50);
72+
return JrubyMemoryReadClientExt.create(queue, 125, 50, batchMetricsSamplingType);
7273
}
7374

7475
@Override

logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.logstash.RubyTestBase;
3131
import org.logstash.RubyUtil;
3232
import org.logstash.execution.QueueBatch;
33+
import org.logstash.execution.QueueReadClientBase.BatchSizeSamplingType;
3334
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
3435
import org.logstash.instrument.metrics.MetricKeys;
3536
import org.logstash.instrument.metrics.MockNamespacedMetric;
@@ -49,7 +50,7 @@ public final class JrubyMemoryReadClientExtTest extends RubyTestBase {
4950
@SuppressWarnings("deprecation")
5051
public void testInflightBatchesTracking() throws InterruptedException, IOException {
5152
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue = new ArrayBlockingQueue<>(10);
52-
final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50);
53+
final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50, BatchSizeSamplingType.NONE);
5354

5455
final ThreadContext context = client.getRuntime().getCurrentContext();
5556

@@ -71,7 +72,7 @@ public void givenNonEmptyQueueAndEnabledBatchSizeMetricThenHistogramContainsData
7172
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue = new ArrayBlockingQueue<>(10);
7273
queue.add(testEvent);
7374

74-
final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50);
75+
final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50, BatchSizeSamplingType.FULL);
7576

7677
AbstractNamespacedMetricExt metric = MockNamespacedMetric.create();
7778
client.setPipelineMetric(metric);
@@ -84,4 +85,24 @@ public void givenNonEmptyQueueAndEnabledBatchSizeMetricThenHistogramContainsData
8485
assertEquals(1.0, metricSnapshot.get75Percentile(), 0.0001);
8586
assertEquals(1.0, metricSnapshot.get90Percentile(), 0.0001);
8687
}
88+
89+
@Test
90+
public void givenNonEmptyQueueAndDisabledBatchSizeMetricThenHistogramIsNotPopulated() throws InterruptedException {
91+
final JrubyEventExtLibrary.RubyEvent testEvent = JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event());
92+
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue = new ArrayBlockingQueue<>(10);
93+
queue.add(testEvent);
94+
95+
final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50, BatchSizeSamplingType.NONE);
96+
97+
AbstractNamespacedMetricExt metric = MockNamespacedMetric.create();
98+
client.setPipelineMetric(metric);
99+
100+
final QueueBatch batch = client.readBatch();
101+
assertEquals(1, batch.filteredSize());
102+
103+
HistogramMetric histogram = HistogramMetric.fromRubyBase(metric, MetricKeys.BATCH_SIZE_KEY);
104+
HistogramSnapshot metricSnapshot = histogram.getValue();
105+
assertEquals(0.0, metricSnapshot.get75Percentile(), 0.0001);
106+
assertEquals(0.0, metricSnapshot.get90Percentile(), 0.0001);
107+
}
87108
}

0 commit comments

Comments
 (0)