diff --git a/docs/static/spec/openapi/logstash-api.yaml b/docs/static/spec/openapi/logstash-api.yaml index c5ac40b7374..bd6e499b7cc 100644 --- a/docs/static/spec/openapi/logstash-api.yaml +++ b/docs/static/spec/openapi/logstash-api.yaml @@ -2346,6 +2346,11 @@ components: encode: type: object properties: + goal: + - enum: + - speed + - balanced + - size ratio: type: object description: the ratio of event size in bytes to its representation on disk diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java index 8a7ee5f246b..510bb0cca6f 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java @@ -4,6 +4,8 @@ import co.elastic.logstash.api.NamespacedMetric; import com.github.luben.zstd.Zstd; +import java.util.Locale; + /** * A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed * bytes and performs deflate compression when encoding. @@ -34,6 +36,7 @@ public enum Goal { this.internalLevel = internalLevel.internalLevel; final NamespacedMetric encodeNamespace = queueMetric.namespace("compression", "encode"); + encodeNamespace.gauge("goal", internalLevel.name().toLowerCase(Locale.ROOT)); encodeRatioMetric = encodeNamespace.namespace("ratio") .register("lifetime", AtomicIORatioMetric.FACTORY); encodeTimerMetric = encodeNamespace.namespace("spend") diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index d7da245552d..4bcead29634 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -208,7 +208,12 @@ shared_examples "pipeline metrics" do # let(:pipeline_id) { defined?(super()) or fail NotImplementedError } let(:settings_overrides) do - super().merge({'pipeline.id' => pipeline_id}) + super().dup.tap do |overrides| + overrides['pipeline.id'] = pipeline_id + if logstash_service.settings.feature_flag == "persistent_queues" + overrides['queue.compression'] = %w(none speed balanced size).sample + end + end end it "can retrieve queue stats" do @@ -242,6 +247,11 @@ queue_compression_stats = queue_stats.fetch("compression") expect(queue_compression_stats.dig('decode', 'ratio', 'lifetime')).to be >= 1 expect(queue_compression_stats.dig('decode', 'spend', 'lifetime')).not_to be_nil + if settings_overrides['queue.compression'] != 'none' + expect(queue_compression_stats.dig('encode', 'goal')).to eq(settings_overrides['queue.compression']) + expect(queue_compression_stats.dig('encode', 'ratio', 'lifetime')).to be <= 1 + expect(queue_compression_stats.dig('encode', 'spend', 'lifetime')).not_to be_nil + end else expect(queue_stats["type"]).to eq("memory") end