1+ package org .logstash .execution ;
2+
3+ import static org .hamcrest .MatcherAssert .assertThat ;
4+ import org .jruby .RubyArray ;
5+ import org .jruby .runtime .ThreadContext ;
6+ import org .jruby .runtime .builtin .IRubyObject ;
7+ import org .junit .Before ;
8+ import org .junit .Test ;
9+ import org .logstash .Event ;
10+ import org .logstash .RubyUtil ;
11+ import org .logstash .ackedqueue .QueueFactoryExt ;
12+ import org .logstash .ext .JrubyEventExtLibrary ;
13+ import org .logstash .instrument .metrics .AbstractNamespacedMetricExt ;
14+ import org .logstash .instrument .metrics .MetricKeys ;
15+ import org .logstash .instrument .metrics .MockNamespacedMetric ;
16+ import org .logstash .instrument .metrics .counter .LongCounter ;
17+
18+ import java .io .IOException ;
19+ import java .util .ArrayList ;
20+ import java .util .Arrays ;
21+ import java .util .Collection ;
22+ import java .util .List ;
23+
24+ import static org .junit .Assert .assertEquals ;
25+
26+ public class QueueReadClientBatchMetricsTest {
27+
28+ public static final class MockQueueBatch implements QueueBatch {
29+
30+ private final long processingTimeNanos ;
31+ private final List <JrubyEventExtLibrary .RubyEvent > events ;
32+
33+ public MockQueueBatch (long processingTimeNanos , JrubyEventExtLibrary .RubyEvent ... events ) {
34+ this .processingTimeNanos = processingTimeNanos ;
35+ this .events = Arrays .stream (events ).toList ();
36+ }
37+
38+ @ Override
39+ @ SuppressWarnings ("unchecked" )
40+ public RubyArray <JrubyEventExtLibrary .RubyEvent > to_a () {
41+ List <IRubyObject > list = new ArrayList <>(events );
42+ return (RubyArray <JrubyEventExtLibrary .RubyEvent >) RubyUtil .RUBY .newArray (list );
43+ }
44+
45+ @ Override
46+ @ SuppressWarnings ("unchecked" )
47+ public Collection <JrubyEventExtLibrary .RubyEvent > events () {
48+ return to_a ();
49+ }
50+
51+ @ Override
52+ public void close () throws IOException {
53+ // no-op
54+ }
55+
56+ @ Override
57+ public int filteredSize () {
58+ return events .size ();
59+ }
60+
61+ public long getProcessingTimeNanos () {
62+ return processingTimeNanos ;
63+ }
64+ }
65+
66+ private AbstractNamespacedMetricExt metric ;
67+ private QueueReadClientBatchMetrics sut ;
68+ private LongCounter batchCounter ;
69+ private LongCounter batchByteSizeCounter ;
70+ private JrubyEventExtLibrary .RubyEvent rubyEvent ;
71+
72+ @ Before
73+ public void setUp () {
74+ metric = MockNamespacedMetric .create ();
75+ sut = new QueueReadClientBatchMetrics (QueueFactoryExt .BatchMetricMode .FULL );
76+ sut .setupMetrics (metric );
77+
78+ ThreadContext context = metric .getRuntime ().getCurrentContext ();
79+ batchCounter = LongCounter .fromRubyBase (metric .namespace (context , MetricKeys .BATCH_KEY ), MetricKeys .BATCH_COUNT );
80+ batchByteSizeCounter = LongCounter .fromRubyBase (metric .namespace (context , MetricKeys .BATCH_KEY ), MetricKeys .BATCH_TOTAL_BYTES );
81+
82+ rubyEvent = JrubyEventExtLibrary .RubyEvent .newRubyEvent (RubyUtil .RUBY , new Event ());
83+ }
84+
85+ @ Test
86+ public void givenEmptyBatchAndFullMetricsWhenUpdateBatchMetricsThenNoMetricsAreUpdated () {
87+ QueueBatch emptyBatch = new MockQueueBatch (10 );
88+
89+ sut .updateBatchMetrics (emptyBatch );
90+
91+ assertEquals (0L , batchCounter .getValue ().longValue ());
92+ }
93+
94+ @ Test
95+ public void givenNonEmptyBatchAndFullMetricsWhenUpdateBatchMetricsThenMetricsAreUpdated () {
96+ QueueBatch batch = new MockQueueBatch (10 , rubyEvent );
97+ final long expectedBatchByteSize = rubyEvent .getEvent ().estimateMemory ();
98+
99+ sut .updateBatchMetrics (batch );
100+
101+ assertEquals (1L , batchCounter .getValue ().longValue ());
102+ assertEquals (expectedBatchByteSize , batchByteSizeCounter .getValue ().longValue ());
103+ }
104+
105+ @ Test
106+ public void givenNonEmptyBatchesAndMinimalMetricsThenMetricsAreUpdated () {
107+ sut = new QueueReadClientBatchMetrics (QueueFactoryExt .BatchMetricMode .MINIMAL );
108+ sut .setupMetrics (metric );
109+
110+ QueueBatch batch = new MockQueueBatch (10 , rubyEvent );
111+ final long expectedBatchByteSize = rubyEvent .getEvent ().estimateMemory ();
112+
113+ for (int i = 0 ; i < 200 ; i ++) {
114+ sut .updateBatchMetrics (batch );
115+ }
116+ sut .updateBatchMetrics (batch );
117+
118+ assertThat (batchCounter .getValue (), org .hamcrest .Matchers .greaterThan (1L ));
119+ assertThat (batchByteSizeCounter .getValue (), org .hamcrest .Matchers .greaterThan (expectedBatchByteSize ));
120+ }
121+
122+ @ Test
123+ public void givenNonEmptyQueueWhenBatchIsReadAndMetricIsDisabledThenBatchCounterMetricIsNotUpdated () {
124+ sut = new QueueReadClientBatchMetrics (QueueFactoryExt .BatchMetricMode .DISABLED );
125+ sut .setupMetrics (metric );
126+ QueueBatch batch = new MockQueueBatch (10 , rubyEvent );
127+
128+ sut .updateBatchMetrics (batch );
129+
130+ assertEquals (0L , batchCounter .getValue ().longValue ());
131+ assertEquals (0L , batchByteSizeCounter .getValue ().longValue ());
132+ }
133+ }
0 commit comments