Skip to content

Commit f3d8b63

Browse files
committed
[Fix] Avoid to increment batch counts for empty batches
1 parent 8b6cb4a commit f3d8b63

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

logstash-core/src/main/java/org/logstash/execution/QueueReadClientBatchMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public void setupMetrics(AbstractNamespacedMetricExt namespacedMetric) {
3838
}
3939

4040
public void updateBatchMetrics(QueueBatch batch) {
41+
if (batch.events().isEmpty()) {
42+
// avoid to increment batch count for empty batches
43+
return;
44+
}
45+
4146
if (batchMetricMode != QueueFactoryExt.BatchMetricMode.DISABLED) {
4247
boolean updateMetric = true;
4348
if (batchMetricMode == QueueFactoryExt.BatchMetricMode.MINIMAL) {

logstash-core/src/test/java/org/logstash/ext/JrubyMemoryReadClientExtTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,17 @@ public void givenNonEmptyQueueWhenBatchIsReadAndMetricIsDisabledThenBatchCounter
104104
assertEquals(0L, batchCounter.getValue().longValue());
105105
}
106106

107+
@Test
108+
public void givenEmptyQueueWhenEmptyBatchIsReadAndMetricIsFullyCollectedThenBatchCounterMetricIsNotUpdated() throws InterruptedException {
109+
final JrubyMemoryReadClientExt client = JrubyMemoryReadClientExt.create(queue, 5, 50,
110+
QueueFactoryExt.BatchMetricMode.FULL);
111+
client.setPipelineMetric(metric);
112+
113+
final QueueBatch batch = client.readBatch();
114+
assertEquals(0, batch.filteredSize());
115+
assertEquals(0L, batchCounter.getValue().longValue());
116+
}
117+
107118
@Test
108119
public void givenNonEmptyQueueWhenBatchIsReadThenBatchByteSizeMetricIsUpdated() throws InterruptedException {
109120
final long expectedBatchByteSize = testEvent.getEvent().estimateMemory();

0 commit comments

Comments
 (0)