-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Pq compression user metrics #18227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pq compression user metrics #18227
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| package org.logstash.ackedqueue; | ||
|
|
||
| import co.elastic.logstash.api.UserMetric; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.logstash.instrument.metrics.AbstractMetric; | ||
|
|
||
| import java.math.BigDecimal; | ||
| import java.math.MathContext; | ||
| import java.math.RoundingMode; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| /** | ||
| * It uses {@code long} under the hood, and is capable of handling sustained 1GiB/sec | ||
| * for ~272 years before overflowing. | ||
| */ | ||
| class AtomicIORatioMetric extends AbstractMetric<Double> implements IORatioMetric { | ||
|
|
||
| public static UserMetric.Factory<IORatioMetric> FACTORY = IORatioMetric.PROVIDER.getFactory(AtomicIORatioMetric::new); | ||
|
|
||
| private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP); | ||
| private static final ImmutableRatio ZERO = new ImmutableRatio(0L, 0L); | ||
| private static final Logger LOGGER = LogManager.getLogger(AtomicIORatioMetric.class); | ||
|
|
||
| private final AtomicReference<ImmutableRatio> atomicReference = new AtomicReference<>(ZERO); | ||
| private final Logger logger; | ||
|
|
||
| AtomicIORatioMetric(final String name) { | ||
| this(name, LOGGER); | ||
| } | ||
|
|
||
| AtomicIORatioMetric(final String name, final Logger logger) { | ||
| super(name); | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| @Override | ||
| public Value getLifetime() { | ||
| return atomicReference.get(); | ||
| } | ||
|
|
||
| @Override | ||
| public void incrementBy(int bytesIn, int bytesOut) { | ||
| if (bytesIn < 0 || bytesOut < 0) { | ||
| logger.warn("cannot decrement IORatioMetric {}", this.getName()); | ||
| return; | ||
| } | ||
| this.atomicReference.getAndUpdate((existing) -> doIncrement(existing, bytesIn, bytesOut)); | ||
| } | ||
|
|
||
| // test injection | ||
| void setTo(long bytesIn, long bytesOut) { | ||
| this.atomicReference.set(new ImmutableRatio(bytesIn, bytesOut)); | ||
| } | ||
|
|
||
| @Override | ||
| public Double getValue() { | ||
| final Value snapshot = getLifetime(); | ||
|
|
||
| final BigDecimal bytesIn = BigDecimal.valueOf(snapshot.bytesIn()); | ||
| final BigDecimal bytesOut = BigDecimal.valueOf(snapshot.bytesOut()); | ||
|
|
||
| if (bytesIn.signum() == 0) { | ||
| return switch(bytesOut.signum()) { | ||
| case -1 -> Double.NEGATIVE_INFINITY; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this possible? Or should incrementBy only accept positive values?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added safeguards to what is acceptable, so this should never get hit, but I would rather leave it in place instead of crashing if the impossible occurs. |
||
| case 1 -> Double.POSITIVE_INFINITY; | ||
| default -> Double.NaN; | ||
| }; | ||
| } | ||
|
|
||
| return bytesOut.divide(bytesIn, LIMITED_PRECISION).doubleValue(); | ||
| } | ||
|
|
||
| public void reset() { | ||
| this.atomicReference.set(ZERO); | ||
| } | ||
|
|
||
| private ImmutableRatio doIncrement(final ImmutableRatio existing, final int bytesIn, final int bytesOut) { | ||
|
|
||
| final long combinedBytesIn = existing.bytesIn() + bytesIn; | ||
| final long combinedBytesOut = existing.bytesOut() + bytesOut; | ||
|
|
||
| if (combinedBytesIn < 0 || combinedBytesOut < 0) { | ||
| logger.warn("long overflow; precision will be reduced"); | ||
| final long reducedBytesIn = Math.addExact(Math.floorDiv(existing.bytesIn(), 2), bytesIn); | ||
| final long reducedBytesOut = Math.addExact(Math.floorDiv(existing.bytesOut(), 2), bytesOut); | ||
|
|
||
| return new ImmutableRatio(reducedBytesIn, reducedBytesOut); | ||
| } | ||
|
|
||
| return new ImmutableRatio(combinedBytesIn, combinedBytesOut); | ||
| } | ||
|
|
||
| public record ImmutableRatio(long bytesIn, long bytesOut) implements Value { } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| package org.logstash.ackedqueue; | ||
|
|
||
| import org.logstash.instrument.metrics.AbstractMetric; | ||
| import org.logstash.instrument.metrics.UptimeMetric; | ||
| import org.logstash.instrument.metrics.timer.TimerMetric; | ||
| import org.logstash.instrument.metrics.timer.TimerMetricFactory; | ||
|
|
||
| import java.math.BigDecimal; | ||
| import java.math.MathContext; | ||
| import java.math.RoundingMode; | ||
|
|
||
| class CalculatedRelativeSpendMetric extends AbstractMetric<Double> implements RelativeSpendMetric { | ||
| private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP); | ||
|
|
||
| private final TimerMetric spendMetric; | ||
| private final UptimeMetric uptimeMetric; | ||
|
|
||
| public static Factory<RelativeSpendMetric> FACTORY = RelativeSpendMetric.PROVIDER.getFactory(CalculatedRelativeSpendMetric::new); | ||
|
|
||
| public CalculatedRelativeSpendMetric(final String name) { | ||
| this(name, TimerMetricFactory.getInstance().create(name + ":spend"), new UptimeMetric(name + ":uptime")); | ||
| } | ||
|
|
||
| CalculatedRelativeSpendMetric(String name, TimerMetric spendMetric, UptimeMetric uptimeMetric) { | ||
| super(name); | ||
| this.spendMetric = spendMetric; | ||
| this.uptimeMetric = uptimeMetric; | ||
| } | ||
|
|
||
| @Override | ||
| public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E { | ||
| return this.spendMetric.time(exceptionalSupplier); | ||
| } | ||
|
|
||
| @Override | ||
| public void reportUntrackedMillis(long untrackedMillis) { | ||
| this.spendMetric.reportUntrackedMillis(untrackedMillis); | ||
| } | ||
|
|
||
| @Override | ||
| public Double getValue() { | ||
| BigDecimal spend = BigDecimal.valueOf(spendMetric.getValue()); | ||
| BigDecimal uptime = BigDecimal.valueOf(uptimeMetric.getValue()); | ||
|
|
||
| if (uptime.signum() == 0) { | ||
| switch (spend.signum()) { | ||
| case -1: | ||
| return Double.NEGATIVE_INFINITY; | ||
| case 0: | ||
| return 0.0; | ||
| case +1: | ||
| return Double.POSITIVE_INFINITY; | ||
| } | ||
| } | ||
|
|
||
| return spend.divide(uptime, LIMITED_PRECISION).doubleValue(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| package org.logstash.ackedqueue; | ||
|
|
||
| import co.elastic.logstash.api.UserMetric; | ||
| import org.logstash.instrument.metrics.MetricType; | ||
|
|
||
| /** | ||
| * A {@code IORatioMetric} is a custom metric that tracks the ratio of input to output. | ||
| */ | ||
| interface IORatioMetric extends UserMetric<Double>, org.logstash.instrument.metrics.Metric<Double> { | ||
| Double getValue(); | ||
|
|
||
| Value getLifetime(); | ||
|
|
||
| void incrementBy(int bytesIn, int bytesOut); | ||
|
|
||
| @Override | ||
| default MetricType getType() { | ||
| return MetricType.USER; | ||
| } | ||
|
|
||
| // NOTE: at 100GiB/sec, this value type has capacity for ~272 years. | ||
| interface Value { | ||
| long bytesIn(); | ||
|
|
||
| long bytesOut(); | ||
| } | ||
|
|
||
| Provider<IORatioMetric> PROVIDER = new Provider<>(IORatioMetric.class, new IORatioMetric() { | ||
| @Override | ||
| public Double getValue() { | ||
| return Double.NaN; | ||
| } | ||
|
|
||
| @Override | ||
| public Value getLifetime() { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public void incrementBy(int bytesIn, int bytesOut) { | ||
| // no-op | ||
| } | ||
|
|
||
| @Override | ||
| public String getName() { | ||
| return "NULL"; | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would including the
goal(speed,balanced, etc) in the payload provide value for monitoring, or checking diags without needing to refer back to logstash.yml?(This is ok to add as a follow-on PR if we think it is worthwhile)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be trivial.
But the
queue.compression.encodenamespace is only present when encoding operations are possible (e.g.,speed,balanced,size) and missing when configured withnone. Do you have a preference for where we would put it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
queue.compression.encode.goalwould work ok if that namespace is present. If it is absent, thennoneis implied, which makes sense, I think