Skip to content

Conversation

@yaauie
Copy link
Member

@yaauie yaauie commented Sep 26, 2025

Release notes

[rn: skip]

What does this PR do?

Adds metrics to the pipeline's queue namespace to reflect the cost and benefit of compression (see: #18107):

name definition expected value range
encode.spend.lifetime (encode_time / uptime) [0,(N_CPUS)]
encode.ratio.lifetime (compressed_bytes / decompresssed_bytes) [0,1]
decode.spend.lifetime (decode_time / uptime) [0,(N_CPUS)]
decode.ratio.lifetime (decompressed_bytes / compresssed_bytes) [1,)

Why is it important/What is the impact to the user?

It gives a user insight into the cost and benefit of compression options, so that they can make informed decisions about expending resources.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • [ ]

How to test this PR locally

  1. create an ndjson file example-input.ndjson:
    {"this":"that","banana":["orange",{"elephant":1}]}
    {"event":{"original":"this is an original event"}}
    
  2. add my feed utility to your path
  3. Invoke Logstash with options that activate the PQ and compression, feeding your input to stdin:
    bin/logstash --log.level=info \
    -Squeue.type=persisted \
    -Squeue.drain=true \
    -Squeue.compression=balanced \
    --config.string 'input { stdin { codec => json_lines } } output { sink {} }' \
    < <(feed example-input.ndjson)
    
  4. while that runs, query the HTTP API:
    curl --silent 'localhost:9600/_node/stats?pretty=true' | jq .pipelines.main.queue.compression
    

@github-actions
Copy link
Contributor

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Contributor

mergify bot commented Sep 26, 2025

This pull request does not have a backport label. Could you fix it @yaauie? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

@github-actions
Copy link
Contributor

🔍 Preview links for changed docs

@yaauie yaauie force-pushed the pq-compression-user-metrics branch from 91da26c to 4d9a9dc Compare September 29, 2025 15:58
@yaauie yaauie force-pushed the pq-compression-user-metrics branch from 4d9a9dc to e0f1215 Compare September 29, 2025 16:12
@yaauie yaauie marked this pull request as ready for review September 29, 2025 17:30
@yaauie yaauie added the backport-skip Skip automated backport with mergify label Sep 29, 2025
return new ImmutableRatio(combinedBytesIn, combinedBytesOut);
} catch (ArithmeticException e) {
// don't crash, just start over.
return new ImmutableRatio(bytesIn, bytesOut);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative that would be less lossy would be to shift the stored number (dividing it by 2 or 4).

But at 100GiB/sec this counter would take ~272 years to overflow so it's not really a concern in practice.

return new ImmutableRatio(combinedBytesIn, combinedBytesOut);
} catch (ArithmeticException e) {
// don't crash, just start over.
return new ImmutableRatio(bytesIn, bytesOut);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log here, just in case there is some misplaced calculation that triggers this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed this in 7ae2395 by logging and halving the existing long value, and then logging. Combined with changing the interface of IORatioMetric#incrementBy to take int, int this should make those invalid calculations even harder to come by.

compression:
type: object
properties:
encode:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would including the goal (speed, balanced, etc) in the payload provide value for monitoring, or checking diags without needing to refer back to logstash.yml?

(This is ok to add as a follow-on PR if we think it is worthwhile)

Copy link
Member Author

@yaauie yaauie Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be trivial.

But the queue.compression.encode namespace is only present when encoding operations are possible (e.g., speed, balanced, size) and missing when configured with none. Do you have a preference for where we would put it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think queue.compression.encode.goal would work ok if that namespace is present. If it is absent, then none is implied, which makes sense, I think


if (bytesIn.signum() == 0) {
return switch(bytesOut.signum()) {
case -1 -> Double.NEGATIVE_INFINITY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible? Or should incrementBy only accept positive values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added safeguards to what is acceptable, so this should never get hit, but I would rather leave it in place instead of crashing if the impossible occurs.

public void testZeroBytesInNegativeBytesOut() {
final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name");

ioRatioMetric.incrementBy(0, -768);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a use case that the ratio should support? Or should this raise an IllegalArgumentException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed in 7ae2395 by making it log and ignore attempts to decrement. This is only ever called with the result of byte[]#length (which is never negative), but I would very much not like this to be a crashing problem that puts in-flight events at risk.

By using `int` type in `IORatioMetric#incrementBy(int,int)`, we simplify the
failure scenarios while still allowing the desired behaviour, since this is
always called in practice with `byte[]#length`.

We ensure that attempts to decrement the value are ignored, and result in a
log message, and that overflows reduce precision and are also logged.

Together, these ensure that long overflows won't ever result in pipeline
crashes.
@yaauie yaauie requested a review from robbavey September 29, 2025 21:30
@elastic-sonarqube
Copy link

robbavey

This comment was marked as outdated.

Copy link
Member

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes made LGTM

@robbavey robbavey dismissed their stale review September 29, 2025 21:41

Premature full LGTM

@elasticmachine
Copy link
Collaborator

💛 Build succeeded, but was flaky

Failed CI Steps

History

Copy link
Member

@robbavey robbavey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yaauie yaauie merged commit 1b3b3ee into elastic:main Sep 29, 2025
12 checks passed
v1v pushed a commit that referenced this pull request Oct 21, 2025
* pq-compression: wire through custom user metrics for ratio/spend

* add test for queue compression metrics

* pq metrics: IORatioMetric edge-case logging, use int at interface

By using `int` type in `IORatioMetric#incrementBy(int,int)`, we simplify the
failure scenarios while still allowing the desired behaviour, since this is
always called in practice with `byte[]#length`.

We ensure that attempts to decrement the value are ignored, and result in a
log message, and that overflows reduce precision and are also logged.

Together, these ensure that long overflows won't ever result in pipeline
crashes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip automated backport with mergify

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants