Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/static/spec/openapi/logstash-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,11 @@ components:
encode:
type: object
properties:
goal:
- enum:
- speed
- balanced
- size
ratio:
type: object
description: the ratio of event size in bytes to its representation on disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import co.elastic.logstash.api.NamespacedMetric;
import com.github.luben.zstd.Zstd;

import java.util.Locale;

/**
* A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed
* bytes and performs deflate compression when encoding.
Expand Down Expand Up @@ -34,6 +36,7 @@ public enum Goal {
this.internalLevel = internalLevel.internalLevel;

final NamespacedMetric encodeNamespace = queueMetric.namespace("compression", "encode");
encodeNamespace.gauge("goal", internalLevel.name().toLowerCase(Locale.ROOT));
encodeRatioMetric = encodeNamespace.namespace("ratio")
.register("lifetime", AtomicIORatioMetric.FACTORY);
encodeTimerMetric = encodeNamespace.namespace("spend")
Expand Down
12 changes: 11 additions & 1 deletion qa/integration/specs/monitoring_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@
shared_examples "pipeline metrics" do
# let(:pipeline_id) { defined?(super()) or fail NotImplementedError }
let(:settings_overrides) do
super().merge({'pipeline.id' => pipeline_id})
super().dup.tap do |overrides|
overrides['pipeline.id'] = pipeline_id
if logstash_service.settings.feature_flag == "persistent_queues"
overrides['queue.compression'] = %w(none speed balanced size).sample
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the decision to sample the available values instead of exploding the matrix by 4x.

end
end
end

it "can retrieve queue stats" do
Expand Down Expand Up @@ -242,6 +247,11 @@
queue_compression_stats = queue_stats.fetch("compression")
expect(queue_compression_stats.dig('decode', 'ratio', 'lifetime')).to be >= 1
expect(queue_compression_stats.dig('decode', 'spend', 'lifetime')).not_to be_nil
if settings_overrides['queue.compression'] != 'none'
expect(queue_compression_stats.dig('encode', 'goal')).to eq(settings_overrides['queue.compression'])
expect(queue_compression_stats.dig('encode', 'ratio', 'lifetime')).to be <= 1
expect(queue_compression_stats.dig('encode', 'spend', 'lifetime')).not_to be_nil
end
else
expect(queue_stats["type"]).to eq("memory")
end
Expand Down
Loading