Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions docs/static/spec/openapi/logstash-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2340,6 +2340,59 @@ components:
max_queue_size_in_bytes:
type: integer
format: int64
compression:
type: object
properties:
encode:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would including the goal (speed, balanced, etc) in the payload provide value for monitoring, or checking diags without needing to refer back to logstash.yml?

(This is ok to add as a follow-on PR if we think it is worthwhile)

Copy link
Member Author

@yaauie yaauie Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be trivial.

But the queue.compression.encode namespace is only present when encoding operations are possible (e.g., speed, balanced, size) and missing when configured with none. Do you have a preference for where we would put it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think queue.compression.encode.goal would work ok if that namespace is present. If it is absent, then none is implied, which makes sense, I think

type: object
properties:
ratio:
type: object
description: the ratio of event size in bytes to its representation on disk
properties:
lifetime:
oneOf:
- type: number
- enum:
- "Infinity"
- "NaN"
- "-Infinity"
spend:
type: object
description: the fraction of wall-clock time spent encoding events
properties:
lifetime:
oneOf:
- type: number
- enum:
- "Infinity"
- "NaN"
- "-Infinity"
decode:
type: object
properties:
ratio:
type: object
description: the ratio of event representation on disk to event size
properties:
lifetime:
oneOf:
- type: number
- enum:
- "Infinity"
- "NaN"
- "-Infinity"
spend:
type: object
description: the fraction of wall-clock time spent decoding events
properties:
lifetime:
oneOf:
- type: number
- enum:
- "Infinity"
- "NaN"
- "-Infinity"
- type: object
description: "The metrics of memory queue."
required:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.logstash.ackedqueue;

import co.elastic.logstash.api.Metric;
import co.elastic.logstash.api.NamespacedMetric;
import com.github.luben.zstd.Zstd;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -13,13 +15,26 @@ abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec {
// log from the concrete class
protected final Logger logger = LogManager.getLogger(this.getClass());

private final IORatioMetric decodeRatioMetric;
private final RelativeSpendMetric decodeTimerMetric;

public AbstractZstdAwareCompressionCodec(Metric queueMetric) {
final NamespacedMetric decodeNamespace = queueMetric.namespace("compression", "decode");
decodeRatioMetric = decodeNamespace.namespace("ratio")
.register("lifetime", AtomicIORatioMetric.FACTORY);
decodeTimerMetric = decodeNamespace.namespace("spend")
.register("lifetime", CalculatedRelativeSpendMetric.FACTORY);
}

@Override
public byte[] decode(byte[] data) {
if (!isZstd(data)) {
decodeRatioMetric.incrementBy(data.length, data.length);
return data;
}
try {
final byte[] decoded = Zstd.decompress(data);
final byte[] decoded = decodeTimerMetric.time(() -> Zstd.decompress(data));
decodeRatioMetric.incrementBy(data.length, decoded.length);
logger.trace("decoded {} -> {}", data.length, decoded.length);
return decoded;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.logstash.ackedqueue;

import co.elastic.logstash.api.UserMetric;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.instrument.metrics.AbstractMetric;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.concurrent.atomic.AtomicReference;

/**
* It uses {@code long} under the hood, and is capable of handling sustained 1GiB/sec
* for ~272 years before overflowing.
*/
class AtomicIORatioMetric extends AbstractMetric<Double> implements IORatioMetric {

public static UserMetric.Factory<IORatioMetric> FACTORY = IORatioMetric.PROVIDER.getFactory(AtomicIORatioMetric::new);

private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP);
private static final ImmutableRatio ZERO = new ImmutableRatio(0L, 0L);
private static final Logger LOGGER = LogManager.getLogger(AtomicIORatioMetric.class);

private final AtomicReference<ImmutableRatio> atomicReference = new AtomicReference<>(ZERO);
private final Logger logger;

AtomicIORatioMetric(final String name) {
this(name, LOGGER);
}

AtomicIORatioMetric(final String name, final Logger logger) {
super(name);
this.logger = logger;
}

@Override
public Value getLifetime() {
return atomicReference.get();
}

@Override
public void incrementBy(int bytesIn, int bytesOut) {
if (bytesIn < 0 || bytesOut < 0) {
logger.warn("cannot decrement IORatioMetric {}", this.getName());
return;
}
this.atomicReference.getAndUpdate((existing) -> doIncrement(existing, bytesIn, bytesOut));
}

// test injection
void setTo(long bytesIn, long bytesOut) {
this.atomicReference.set(new ImmutableRatio(bytesIn, bytesOut));
}

@Override
public Double getValue() {
final Value snapshot = getLifetime();

final BigDecimal bytesIn = BigDecimal.valueOf(snapshot.bytesIn());
final BigDecimal bytesOut = BigDecimal.valueOf(snapshot.bytesOut());

if (bytesIn.signum() == 0) {
return switch(bytesOut.signum()) {
case -1 -> Double.NEGATIVE_INFINITY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible? Or should incrementBy only accept positive values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added safeguards to what is acceptable, so this should never get hit, but I would rather leave it in place instead of crashing if the impossible occurs.

case 1 -> Double.POSITIVE_INFINITY;
default -> Double.NaN;
};
}

return bytesOut.divide(bytesIn, LIMITED_PRECISION).doubleValue();
}

public void reset() {
this.atomicReference.set(ZERO);
}

private ImmutableRatio doIncrement(final ImmutableRatio existing, final int bytesIn, final int bytesOut) {

final long combinedBytesIn = existing.bytesIn() + bytesIn;
final long combinedBytesOut = existing.bytesOut() + bytesOut;

if (combinedBytesIn < 0 || combinedBytesOut < 0) {
logger.warn("long overflow; precision will be reduced");
final long reducedBytesIn = Math.addExact(Math.floorDiv(existing.bytesIn(), 2), bytesIn);
final long reducedBytesOut = Math.addExact(Math.floorDiv(existing.bytesOut(), 2), bytesOut);

return new ImmutableRatio(reducedBytesIn, reducedBytesOut);
}

return new ImmutableRatio(combinedBytesIn, combinedBytesOut);
}

public record ImmutableRatio(long bytesIn, long bytesOut) implements Value { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.logstash.ackedqueue;

import org.logstash.instrument.metrics.AbstractMetric;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.timer.TimerMetricFactory;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;

class CalculatedRelativeSpendMetric extends AbstractMetric<Double> implements RelativeSpendMetric {
private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP);

private final TimerMetric spendMetric;
private final UptimeMetric uptimeMetric;

public static Factory<RelativeSpendMetric> FACTORY = RelativeSpendMetric.PROVIDER.getFactory(CalculatedRelativeSpendMetric::new);

public CalculatedRelativeSpendMetric(final String name) {
this(name, TimerMetricFactory.getInstance().create(name + ":spend"), new UptimeMetric(name + ":uptime"));
}

CalculatedRelativeSpendMetric(String name, TimerMetric spendMetric, UptimeMetric uptimeMetric) {
super(name);
this.spendMetric = spendMetric;
this.uptimeMetric = uptimeMetric;
}

@Override
public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E {
return this.spendMetric.time(exceptionalSupplier);
}

@Override
public void reportUntrackedMillis(long untrackedMillis) {
this.spendMetric.reportUntrackedMillis(untrackedMillis);
}

@Override
public Double getValue() {
BigDecimal spend = BigDecimal.valueOf(spendMetric.getValue());
BigDecimal uptime = BigDecimal.valueOf(uptimeMetric.getValue());

if (uptime.signum() == 0) {
switch (spend.signum()) {
case -1:
return Double.NEGATIVE_INFINITY;
case 0:
return 0.0;
case +1:
return Double.POSITIVE_INFINITY;
}
}

return spend.divide(uptime, LIMITED_PRECISION).doubleValue();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.logstash.ackedqueue;

import co.elastic.logstash.api.Metric;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ackedqueue.ZstdEnabledCompressionCodec.Goal;
import org.logstash.plugins.NamespacedMetricImpl;

public interface CompressionCodec {
Logger LOGGER = LogManager.getLogger(CompressionCodec.class);
Expand All @@ -26,33 +29,41 @@ public byte[] decode(byte[] data) {
}
};

static CompressionCodec fromConfigValue(final String configValue) {
return fromConfigValue(configValue, LOGGER);
@FunctionalInterface
interface Factory {
CompressionCodec create(final Metric metric);
default CompressionCodec create() {
return create(NamespacedMetricImpl.getNullMetric());
}
}

static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
return switch (configValue) {
case "disabled" -> {
static CompressionCodec.Factory fromConfigValue(final String configValue, final Logger logger) {
return switch(configValue) {
case "disabled" -> (metric) -> {
logger.warn("compression support has been disabled");
yield CompressionCodec.NOOP;
}
case "none" -> {
return CompressionCodec.NOOP;
};
case "none" -> (metric) -> {
logger.info("compression support is enabled (read-only)");
yield ZstdAwareCompressionCodec.getInstance();
}
case "speed" -> {
return new ZstdAwareCompressionCodec(metric);
};
case "speed" -> (metric) -> {
logger.info("compression support is enabled (goal: speed)");
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED);
}
case "balanced" -> {
return new ZstdEnabledCompressionCodec(Goal.SPEED, metric);
};
case "balanced" -> (metric) -> {
logger.info("compression support is enabled (goal: balanced)");
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED);
}
case "size" -> {
return new ZstdEnabledCompressionCodec(Goal.BALANCED, metric);
};
case "size" -> (metric) -> {
logger.info("compression support is enabled (goal: size)");
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE);
}
return new ZstdEnabledCompressionCodec(Goal.SIZE, metric);
};
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
};
}

static CompressionCodec.Factory fromConfigValue(final String configValue) {
return fromConfigValue(configValue, LOGGER);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.logstash.ackedqueue;

import co.elastic.logstash.api.UserMetric;
import org.logstash.instrument.metrics.MetricType;

/**
* A {@code IORatioMetric} is a custom metric that tracks the ratio of input to output.
*/
interface IORatioMetric extends UserMetric<Double>, org.logstash.instrument.metrics.Metric<Double> {
Double getValue();

Value getLifetime();

void incrementBy(int bytesIn, int bytesOut);

@Override
default MetricType getType() {
return MetricType.USER;
}

// NOTE: at 100GiB/sec, this value type has capacity for ~272 years.
interface Value {
long bytesIn();

long bytesOut();
}

Provider<IORatioMetric> PROVIDER = new Provider<>(IORatioMetric.class, new IORatioMetric() {
@Override
public Double getValue() {
return Double.NaN;
}

@Override
public Value getLifetime() {
return null;
}

@Override
public void incrementBy(int bytesIn, int bytesOut) {
// no-op
}

@Override
public String getName() {
return "NULL";
}
});
}
Loading
Loading