diff --git a/docs/static/spec/openapi/logstash-api.yaml b/docs/static/spec/openapi/logstash-api.yaml index b979006b7dd..c5ac40b7374 100644 --- a/docs/static/spec/openapi/logstash-api.yaml +++ b/docs/static/spec/openapi/logstash-api.yaml @@ -2340,6 +2340,59 @@ components: max_queue_size_in_bytes: type: integer format: int64 + compression: + type: object + properties: + encode: + type: object + properties: + ratio: + type: object + description: the ratio of event size in bytes to its representation on disk + properties: + lifetime: + oneOf: + - type: number + - enum: + - "Infinity" + - "NaN" + - "-Infinity" + spend: + type: object + description: the fraction of wall-clock time spent encoding events + properties: + lifetime: + oneOf: + - type: number + - enum: + - "Infinity" + - "NaN" + - "-Infinity" + decode: + type: object + properties: + ratio: + type: object + description: the ratio of event representation on disk to event size + properties: + lifetime: + oneOf: + - type: number + - enum: + - "Infinity" + - "NaN" + - "-Infinity" + spend: + type: object + description: the fraction of wall-clock time spent decoding events + properties: + lifetime: + oneOf: + - type: number + - enum: + - "Infinity" + - "NaN" + - "-Infinity" - type: object description: "The metrics of memory queue." required: diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java index 1cb54c98a77..9cf159d4ce4 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java @@ -1,5 +1,7 @@ package org.logstash.ackedqueue; +import co.elastic.logstash.api.Metric; +import co.elastic.logstash.api.NamespacedMetric; import com.github.luben.zstd.Zstd; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -13,13 +15,26 @@ abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec { // log from the concrete class protected final Logger logger = LogManager.getLogger(this.getClass()); + private final IORatioMetric decodeRatioMetric; + private final RelativeSpendMetric decodeTimerMetric; + + public AbstractZstdAwareCompressionCodec(Metric queueMetric) { + final NamespacedMetric decodeNamespace = queueMetric.namespace("compression", "decode"); + decodeRatioMetric = decodeNamespace.namespace("ratio") + .register("lifetime", AtomicIORatioMetric.FACTORY); + decodeTimerMetric = decodeNamespace.namespace("spend") + .register("lifetime", CalculatedRelativeSpendMetric.FACTORY); + } + @Override public byte[] decode(byte[] data) { if (!isZstd(data)) { + decodeRatioMetric.incrementBy(data.length, data.length); return data; } try { - final byte[] decoded = Zstd.decompress(data); + final byte[] decoded = decodeTimerMetric.time(() -> Zstd.decompress(data)); + decodeRatioMetric.incrementBy(data.length, decoded.length); logger.trace("decoded {} -> {}", data.length, decoded.length); return decoded; } catch (Exception e) { diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AtomicIORatioMetric.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AtomicIORatioMetric.java new file mode 100644 index 00000000000..fc3426a515a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AtomicIORatioMetric.java @@ -0,0 +1,95 @@ +package org.logstash.ackedqueue; + +import co.elastic.logstash.api.UserMetric; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.logstash.instrument.metrics.AbstractMetric; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; +import java.util.concurrent.atomic.AtomicReference; + +/** + * It uses {@code long} under the hood, and is capable of handling sustained 1GiB/sec + * for ~272 years before overflowing. + */ +class AtomicIORatioMetric extends AbstractMetric implements IORatioMetric { + + public static UserMetric.Factory FACTORY = IORatioMetric.PROVIDER.getFactory(AtomicIORatioMetric::new); + + private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP); + private static final ImmutableRatio ZERO = new ImmutableRatio(0L, 0L); + private static final Logger LOGGER = LogManager.getLogger(AtomicIORatioMetric.class); + + private final AtomicReference atomicReference = new AtomicReference<>(ZERO); + private final Logger logger; + + AtomicIORatioMetric(final String name) { + this(name, LOGGER); + } + + AtomicIORatioMetric(final String name, final Logger logger) { + super(name); + this.logger = logger; + } + + @Override + public Value getLifetime() { + return atomicReference.get(); + } + + @Override + public void incrementBy(int bytesIn, int bytesOut) { + if (bytesIn < 0 || bytesOut < 0) { + logger.warn("cannot decrement IORatioMetric {}", this.getName()); + return; + } + this.atomicReference.getAndUpdate((existing) -> doIncrement(existing, bytesIn, bytesOut)); + } + + // test injection + void setTo(long bytesIn, long bytesOut) { + this.atomicReference.set(new ImmutableRatio(bytesIn, bytesOut)); + } + + @Override + public Double getValue() { + final Value snapshot = getLifetime(); + + final BigDecimal bytesIn = BigDecimal.valueOf(snapshot.bytesIn()); + final BigDecimal bytesOut = BigDecimal.valueOf(snapshot.bytesOut()); + + if (bytesIn.signum() == 0) { + return switch(bytesOut.signum()) { + case -1 -> Double.NEGATIVE_INFINITY; + case 1 -> Double.POSITIVE_INFINITY; + default -> Double.NaN; + }; + } + + return bytesOut.divide(bytesIn, LIMITED_PRECISION).doubleValue(); + } + + public void reset() { + this.atomicReference.set(ZERO); + } + + private ImmutableRatio doIncrement(final ImmutableRatio existing, final int bytesIn, final int bytesOut) { + + final long combinedBytesIn = existing.bytesIn() + bytesIn; + final long combinedBytesOut = existing.bytesOut() + bytesOut; + + if (combinedBytesIn < 0 || combinedBytesOut < 0) { + logger.warn("long overflow; precision will be reduced"); + final long reducedBytesIn = Math.addExact(Math.floorDiv(existing.bytesIn(), 2), bytesIn); + final long reducedBytesOut = Math.addExact(Math.floorDiv(existing.bytesOut(), 2), bytesOut); + + return new ImmutableRatio(reducedBytesIn, reducedBytesOut); + } + + return new ImmutableRatio(combinedBytesIn, combinedBytesOut); + } + + public record ImmutableRatio(long bytesIn, long bytesOut) implements Value { } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/CalculatedRelativeSpendMetric.java b/logstash-core/src/main/java/org/logstash/ackedqueue/CalculatedRelativeSpendMetric.java new file mode 100644 index 00000000000..c70adcba7a3 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/CalculatedRelativeSpendMetric.java @@ -0,0 +1,58 @@ +package org.logstash.ackedqueue; + +import org.logstash.instrument.metrics.AbstractMetric; +import org.logstash.instrument.metrics.UptimeMetric; +import org.logstash.instrument.metrics.timer.TimerMetric; +import org.logstash.instrument.metrics.timer.TimerMetricFactory; + +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; + +class CalculatedRelativeSpendMetric extends AbstractMetric implements RelativeSpendMetric { + private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP); + + private final TimerMetric spendMetric; + private final UptimeMetric uptimeMetric; + + public static Factory FACTORY = RelativeSpendMetric.PROVIDER.getFactory(CalculatedRelativeSpendMetric::new); + + public CalculatedRelativeSpendMetric(final String name) { + this(name, TimerMetricFactory.getInstance().create(name + ":spend"), new UptimeMetric(name + ":uptime")); + } + + CalculatedRelativeSpendMetric(String name, TimerMetric spendMetric, UptimeMetric uptimeMetric) { + super(name); + this.spendMetric = spendMetric; + this.uptimeMetric = uptimeMetric; + } + + @Override + public T time(ExceptionalSupplier exceptionalSupplier) throws E { + return this.spendMetric.time(exceptionalSupplier); + } + + @Override + public void reportUntrackedMillis(long untrackedMillis) { + this.spendMetric.reportUntrackedMillis(untrackedMillis); + } + + @Override + public Double getValue() { + BigDecimal spend = BigDecimal.valueOf(spendMetric.getValue()); + BigDecimal uptime = BigDecimal.valueOf(uptimeMetric.getValue()); + + if (uptime.signum() == 0) { + switch (spend.signum()) { + case -1: + return Double.NEGATIVE_INFINITY; + case 0: + return 0.0; + case +1: + return Double.POSITIVE_INFINITY; + } + } + + return spend.divide(uptime, LIMITED_PRECISION).doubleValue(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java index b1f99cf9980..848de0ce9a4 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java @@ -1,7 +1,10 @@ package org.logstash.ackedqueue; +import co.elastic.logstash.api.Metric; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.logstash.ackedqueue.ZstdEnabledCompressionCodec.Goal; +import org.logstash.plugins.NamespacedMetricImpl; public interface CompressionCodec { Logger LOGGER = LogManager.getLogger(CompressionCodec.class); @@ -26,33 +29,41 @@ public byte[] decode(byte[] data) { } }; - static CompressionCodec fromConfigValue(final String configValue) { - return fromConfigValue(configValue, LOGGER); + @FunctionalInterface + interface Factory { + CompressionCodec create(final Metric metric); + default CompressionCodec create() { + return create(NamespacedMetricImpl.getNullMetric()); + } } - static CompressionCodec fromConfigValue(final String configValue, final Logger logger) { - return switch (configValue) { - case "disabled" -> { + static CompressionCodec.Factory fromConfigValue(final String configValue, final Logger logger) { + return switch(configValue) { + case "disabled" -> (metric) -> { logger.warn("compression support has been disabled"); - yield CompressionCodec.NOOP; - } - case "none" -> { + return CompressionCodec.NOOP; + }; + case "none" -> (metric) -> { logger.info("compression support is enabled (read-only)"); - yield ZstdAwareCompressionCodec.getInstance(); - } - case "speed" -> { + return new ZstdAwareCompressionCodec(metric); + }; + case "speed" -> (metric) -> { logger.info("compression support is enabled (goal: speed)"); - yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED); - } - case "balanced" -> { + return new ZstdEnabledCompressionCodec(Goal.SPEED, metric); + }; + case "balanced" -> (metric) -> { logger.info("compression support is enabled (goal: balanced)"); - yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED); - } - case "size" -> { + return new ZstdEnabledCompressionCodec(Goal.BALANCED, metric); + }; + case "size" -> (metric) -> { logger.info("compression support is enabled (goal: size)"); - yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE); - } + return new ZstdEnabledCompressionCodec(Goal.SIZE, metric); + }; default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue)); }; } + + static CompressionCodec.Factory fromConfigValue(final String configValue) { + return fromConfigValue(configValue, LOGGER); + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/IORatioMetric.java b/logstash-core/src/main/java/org/logstash/ackedqueue/IORatioMetric.java new file mode 100644 index 00000000000..85c8840bdc1 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/IORatioMetric.java @@ -0,0 +1,49 @@ +package org.logstash.ackedqueue; + +import co.elastic.logstash.api.UserMetric; +import org.logstash.instrument.metrics.MetricType; + +/** + * A {@code IORatioMetric} is a custom metric that tracks the ratio of input to output. + */ +interface IORatioMetric extends UserMetric, org.logstash.instrument.metrics.Metric { + Double getValue(); + + Value getLifetime(); + + void incrementBy(int bytesIn, int bytesOut); + + @Override + default MetricType getType() { + return MetricType.USER; + } + + // NOTE: at 100GiB/sec, this value type has capacity for ~272 years. + interface Value { + long bytesIn(); + + long bytesOut(); + } + + Provider PROVIDER = new Provider<>(IORatioMetric.class, new IORatioMetric() { + @Override + public Double getValue() { + return Double.NaN; + } + + @Override + public Value getLifetime() { + return null; + } + + @Override + public void incrementBy(int bytesIn, int bytesOut) { + // no-op + } + + @Override + public String getName() { + return "NULL"; + } + }); +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index ceace485888..42771a1b148 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -37,6 +37,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import co.elastic.logstash.api.Metric; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.FileLockFactory; @@ -46,6 +47,7 @@ import org.logstash.ackedqueue.io.MmapPageIOV2; import org.logstash.ackedqueue.io.PageIO; import org.logstash.common.FsUtil; +import org.logstash.plugins.NamespacedMetricImpl; /** * Persistent queue implementation. @@ -96,7 +98,15 @@ public final class Queue implements Closeable { private static final Logger logger = LogManager.getLogger(Queue.class); + private final Metric metric; + + public Queue(Settings settings) { + this(settings, null); + } + + public Queue(Settings settings, Metric metric) { + this.metric = Objects.requireNonNullElseGet(metric, NamespacedMetricImpl::getNullMetric); try { final Path queueDir = Paths.get(settings.getDirPath()); // Files.createDirectories raises a FileAlreadyExistsException @@ -113,7 +123,7 @@ public Queue(Settings settings) { this.maxBytes = settings.getQueueMaxBytes(); this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry()); this.elementClass = settings.getElementClass(); - this.compressionCodec = settings.getCompressionCodec(); + this.compressionCodec = settings.getCompressionCodecFactory().create(metric); this.tailPages = new ArrayList<>(); this.unreadTailPages = new ArrayList<>(); this.closed = new AtomicBoolean(true); // not yet opened diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index 6d64bd4d9e4..de692ee8561 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -25,6 +25,7 @@ import java.nio.file.Path; import java.nio.file.Paths; +import co.elastic.logstash.api.NamespacedMetric; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jruby.Ruby; @@ -40,6 +41,8 @@ import org.logstash.common.SettingKeyDefinitions; import org.logstash.execution.AbstractWrappedQueueExt; import org.logstash.ext.JrubyWrappedSynchronousQueueExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.NamespacedMetricImpl; import static org.logstash.common.SettingKeyDefinitions.*; @@ -78,9 +81,16 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } + @Deprecated @JRubyMethod(meta = true) public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv, - final IRubyObject settings) throws IOException { + final IRubyObject settings) throws IOException { + return create(context, settings, null); + } + + public static AbstractWrappedQueueExt create(final ThreadContext context, + final IRubyObject settings, + final AbstractNamespacedMetricExt metric) throws IOException { final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString(); final BatchMetricMode batchMetricMode = decodeBatchMetricMode(context, settings); if (PERSISTED_TYPE.equals(type)) { @@ -93,7 +103,9 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final Files.createDirectories(queuePath); } - return JRubyWrappedAckedQueueExt.create(context, queueSettings, batchMetricMode); + final NamespacedMetric namespacedMetric = getMetric(context, metric); + return JRubyWrappedAckedQueueExt.create(context, queueSettings, namespacedMetric, batchMetricMode); + } else if (MEMORY_TYPE.equals(type)) { final int batchSize = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE) .convertToInteger().getIntValue(); @@ -103,11 +115,11 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final return JrubyWrappedSynchronousQueueExt.create(context, queueSize, batchMetricMode); } else { throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format( - "Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'", - type - ) + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'", + type + ) ); } } @@ -122,6 +134,13 @@ private static BatchMetricMode decodeBatchMetricMode(ThreadContext context, IRub return BatchMetricMode.valueOf(batchMetricModeStr.toUpperCase()); } + private static NamespacedMetric getMetric(final ThreadContext context, final AbstractNamespacedMetricExt metric) { + if ( metric == null ) { + return NamespacedMetricImpl.getNullMetric(); + } + return new NamespacedMetricImpl(context, metric); + } + private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings, final String name) { return settings.callMethod(context, "get_value", context.runtime.newString(name)); @@ -142,11 +161,11 @@ private static Settings extractQueueSettings(final IRubyObject settings) { .checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class)) .checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue()) .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class)) - .compressionCodec(extractConfiguredCodec(settings)) + .compressionCodecFactory(extractConfiguredCodec(settings)) .build(); } - private static CompressionCodec extractConfiguredCodec(final IRubyObject settings) { + private static CompressionCodec.Factory extractConfiguredCodec(final IRubyObject settings) { final ThreadContext context = settings.getRuntime().getCurrentContext(); final String compressionSetting = getSetting(context, settings, QUEUE_COMPRESSION).asJavaString(); return CompressionCodec.fromConfigValue(compressionSetting, LOGGER); diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/RelativeSpendMetric.java b/logstash-core/src/main/java/org/logstash/ackedqueue/RelativeSpendMetric.java new file mode 100644 index 00000000000..615f5612c60 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/RelativeSpendMetric.java @@ -0,0 +1,35 @@ +package org.logstash.ackedqueue; + +import co.elastic.logstash.api.TimerMetric; +import co.elastic.logstash.api.UserMetric; +import org.logstash.instrument.metrics.MetricType; +import org.logstash.instrument.metrics.timer.NullTimerMetric; + +interface RelativeSpendMetric extends UserMetric, org.logstash.instrument.metrics.Metric, TimerMetric { + + default MetricType getType() { + return MetricType.USER; + } + + Provider PROVIDER = new Provider<>(RelativeSpendMetric.class, new RelativeSpendMetric() { + @Override + public T time(ExceptionalSupplier exceptionalSupplier) throws E { + return NullTimerMetric.getInstance().time(exceptionalSupplier); + } + + @Override + public void reportUntrackedMillis(long untrackedMillis) { + // no-op + } + + @Override + public Double getValue() { + return 0.0; + } + + @Override + public String getName() { + return "NULL"; + } + }); +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java index 36d4b60e3b2..2e4646281f9 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java @@ -44,7 +44,7 @@ public interface Settings { boolean getCheckpointRetry(); - CompressionCodec getCompressionCodec(); + CompressionCodec.Factory getCompressionCodecFactory(); /** * Validate and return the settings, or throw descriptive {@link QueueRuntimeException} @@ -91,7 +91,7 @@ interface Builder { Builder checkpointRetry(boolean checkpointRetry); - Builder compressionCodec(CompressionCodec compressionCodec); + Builder compressionCodecFactory(CompressionCodec.Factory compressionCodecFactory); Settings build(); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java index 923217af366..c5a95c3b67d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java @@ -31,7 +31,7 @@ public class SettingsImpl implements Settings { private final int checkpointMaxAcks; private final int checkpointMaxWrites; private final boolean checkpointRetry; - private final CompressionCodec compressionCodec; + private final CompressionCodec.Factory compressionCodec; public static Builder builder(final Settings settings) { return new BuilderImpl(settings); @@ -50,7 +50,7 @@ private SettingsImpl(final BuilderImpl builder) { this.checkpointMaxAcks = builder.checkpointMaxAcks; this.checkpointMaxWrites = builder.checkpointMaxWrites; this.checkpointRetry = builder.checkpointRetry; - this.compressionCodec = builder.compressionCodec; + this.compressionCodec = builder.compressionCodecFactory; } @Override @@ -94,7 +94,7 @@ public boolean getCheckpointRetry() { } @Override - public CompressionCodec getCompressionCodec() { + public CompressionCodec.Factory getCompressionCodecFactory() { return this.compressionCodec; } @@ -147,7 +147,7 @@ private static final class BuilderImpl implements Builder { private boolean checkpointRetry; - private CompressionCodec compressionCodec; + private CompressionCodec.Factory compressionCodecFactory; private BuilderImpl(final String dirForFiles) { this.dirForFiles = dirForFiles; @@ -157,7 +157,7 @@ private BuilderImpl(final String dirForFiles) { this.maxUnread = DEFAULT_MAX_UNREAD; this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS; this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES; - this.compressionCodec = CompressionCodec.NOOP; + this.compressionCodecFactory = (metric) -> CompressionCodec.NOOP; this.checkpointRetry = false; } @@ -170,7 +170,7 @@ private BuilderImpl(final Settings settings) { this.checkpointMaxAcks = settings.getCheckpointMaxAcks(); this.checkpointMaxWrites = settings.getCheckpointMaxWrites(); this.checkpointRetry = settings.getCheckpointRetry(); - this.compressionCodec = settings.getCompressionCodec(); + this.compressionCodecFactory = settings.getCompressionCodecFactory(); } @Override @@ -216,8 +216,8 @@ public Builder checkpointRetry(final boolean checkpointRetry) { } @Override - public Builder compressionCodec(CompressionCodec compressionCodec) { - this.compressionCodec = compressionCodec; + public Builder compressionCodecFactory(CompressionCodec.Factory compressionCodec) { + this.compressionCodecFactory = compressionCodec; return this; } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java index f82b4b75f2e..c4797e41448 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java @@ -1,14 +1,15 @@ package org.logstash.ackedqueue; +import co.elastic.logstash.api.Metric; + /** * A {@link ZstdAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed * bytes, but performs no compression when encoding. */ class ZstdAwareCompressionCodec extends AbstractZstdAwareCompressionCodec { - private static final ZstdAwareCompressionCodec INSTANCE = new ZstdAwareCompressionCodec(); - static ZstdAwareCompressionCodec getInstance() { - return INSTANCE; + public ZstdAwareCompressionCodec(Metric queueMetric) { + super(queueMetric); } @Override diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java index fa5a22b3ee6..8a7ee5f246b 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java @@ -1,5 +1,7 @@ package org.logstash.ackedqueue; +import co.elastic.logstash.api.Metric; +import co.elastic.logstash.api.NamespacedMetric; import com.github.luben.zstd.Zstd; /** @@ -24,14 +26,25 @@ public enum Goal { private final int internalLevel; - ZstdEnabledCompressionCodec(final Goal internalLevel) { + private final IORatioMetric encodeRatioMetric; + private final RelativeSpendMetric encodeTimerMetric; + + ZstdEnabledCompressionCodec(final Goal internalLevel, final Metric queueMetric) { + super(queueMetric); this.internalLevel = internalLevel.internalLevel; + + final NamespacedMetric encodeNamespace = queueMetric.namespace("compression", "encode"); + encodeRatioMetric = encodeNamespace.namespace("ratio") + .register("lifetime", AtomicIORatioMetric.FACTORY); + encodeTimerMetric = encodeNamespace.namespace("spend") + .register("lifetime", CalculatedRelativeSpendMetric.FACTORY); } @Override public byte[] encode(byte[] data) { try { - final byte[] encoded = Zstd.compress(data, internalLevel); + final byte[] encoded = encodeTimerMetric.time(() -> Zstd.compress(data, internalLevel)); + encodeRatioMetric.incrementBy(data.length, encoded.length); logger.trace("encoded {} -> {}", data.length, encoded.length); return encoded; } catch (Exception e) { diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java index 770824935ce..d98d49a3362 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Objects; +import co.elastic.logstash.api.Metric; import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; @@ -42,6 +43,8 @@ import org.logstash.ackedqueue.QueueExceptionMessages; import org.logstash.ackedqueue.Settings; import org.logstash.ackedqueue.SettingsImpl; +import org.logstash.plugins.NamespacedMetricImpl; + /** * JRuby extension to wrap a persistent queue instance. @@ -62,9 +65,14 @@ public Queue getQueue() { return this.queue; } + @Deprecated public static JRubyAckedQueueExt create(final Settings settings) { + return create(settings, NamespacedMetricImpl.getNullMetric()); + } + + public static JRubyAckedQueueExt create(final Settings settings, final Metric metric) { JRubyAckedQueueExt queueExt = new JRubyAckedQueueExt(RubyUtil.RUBY, RubyUtil.ACKED_QUEUE_CLASS); - queueExt.queue = new Queue(settings); + queueExt.queue = new Queue(settings, metric); return queueExt; } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index 42d11da8875..ca58436bd4d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Objects; +import co.elastic.logstash.api.Metric; import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; @@ -40,6 +41,8 @@ import org.logstash.ext.JrubyAckedReadClientExt; import org.logstash.ext.JrubyAckedWriteClientExt; import org.logstash.ext.JrubyEventExtLibrary; +import org.logstash.instrument.metrics.AbstractMetricExt; +import org.logstash.plugins.NamespacedMetricImpl; /** * JRuby extension @@ -52,8 +55,9 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { private JRubyAckedQueueExt queue; private QueueFactoryExt.BatchMetricMode batchMetricMode; - @JRubyMethod(required=2) - public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject settings, IRubyObject batchMetricMode) throws IOException { + @JRubyMethod(required=2, optional=1) + public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { + final IRubyObject settings = args[0]; if (!JavaUtil.isJavaObject(settings)) { // We should never get here, but previously had an initialize method // that took 7 technically-optional ordered parameters. @@ -64,6 +68,7 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject s settings)); } + final IRubyObject batchMetricMode = args[1]; Objects.requireNonNull(batchMetricMode, "batchMetricMode setting must be non-null"); if (!JavaUtil.isJavaObject(batchMetricMode)) { throw new IllegalArgumentException( @@ -73,9 +78,10 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject s batchMetricMode)); } + final Metric metric = getApiMetric(args.length > 2 ? args[2] : null); Settings javaSettings = JavaUtil.unwrapJavaObject(settings); - this.queue = JRubyAckedQueueExt.create(javaSettings); + this.queue = JRubyAckedQueueExt.create(javaSettings, metric); this.batchMetricMode = JavaUtil.unwrapJavaObject(batchMetricMode); this.queue.open(); @@ -83,14 +89,23 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject s return this; } - public static JRubyWrappedAckedQueueExt create(ThreadContext context, Settings settings, QueueFactoryExt.BatchMetricMode batchMetricMode) throws IOException { - return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS, settings, batchMetricMode); + public static JRubyWrappedAckedQueueExt create(ThreadContext context, Settings settings, Metric metric, QueueFactoryExt.BatchMetricMode batchMetricMode) throws IOException { + return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS, settings, metric, batchMetricMode); } + @Deprecated public JRubyWrappedAckedQueueExt(Ruby runtime, RubyClass metaClass, Settings settings, QueueFactoryExt.BatchMetricMode batchMetricMode) throws IOException { + this(runtime, metaClass, settings, NamespacedMetricImpl.getNullMetric(), batchMetricMode); + } + + public JRubyWrappedAckedQueueExt(final Ruby runtime, + final RubyClass metaClass, + final Settings settings, + final Metric metric, + final QueueFactoryExt.BatchMetricMode batchMetricMode) throws IOException { super(runtime, metaClass); this.batchMetricMode = Objects.requireNonNull(batchMetricMode, "batchMetricMode setting must be non-null"); - this.queue = JRubyAckedQueueExt.create(settings); + this.queue = JRubyAckedQueueExt.create(settings, metric); this.queue.open(); } @@ -98,6 +113,19 @@ public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) super(runtime, metaClass); } + private static Metric getApiMetric(IRubyObject metric) { + if (Objects.isNull(metric) || metric.isNil()) { + return NamespacedMetricImpl.getNullMetric(); + } + if (metric instanceof AbstractMetricExt rubyExtensionMetric) { + return rubyExtensionMetric.asApiMetric(); + } + if (Metric.class.isAssignableFrom(metric.getJavaClass())) { + return metric.toJava(Metric.class); + } + throw new IllegalArgumentException(String.format("Object <%s> could not be converted to a metric", metric.inspect())); + } + @JRubyMethod(name = "queue") public JRubyAckedQueueExt rubyGetQueue() { return queue; diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 5bf98abff19..b90b77de54f 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -300,8 +300,9 @@ private AbstractPipelineExt initialize(final ThreadContext context, */ @JRubyMethod(name = "open_queue") public final IRubyObject openQueue(final ThreadContext context) { + final AbstractNamespacedMetricExt queueNamespace = metric.namespace(context, pipelineNamespacedPath(QUEUE_KEY)); try { - queue = QueueFactoryExt.create(context, null, settings); + queue = QueueFactoryExt.create(context, settings, queueNamespace); } catch (final Exception ex) { LOGGER.error("Logstash failed to create queue.", ex); throw new IllegalStateException(ex); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractMetricExt.java index a0298faf18c..7b1ada37c13 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractMetricExt.java @@ -52,6 +52,8 @@ public final IRubyObject collector(final ThreadContext context) { return getCollector(context); } + public abstract co.elastic.logstash.api.Metric asApiMetric(); + protected abstract AbstractNamespacedMetricExt createNamespaced( ThreadContext context, IRubyObject name ); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java index 65f68633138..4dfc8b48555 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java @@ -20,6 +20,7 @@ package org.logstash.instrument.metrics; +import co.elastic.logstash.api.Metric; import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyClass; @@ -28,6 +29,7 @@ import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.plugins.NamespacedMetricImpl; @JRubyClass(name = "AbstractNamespacedMetric") public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt { @@ -109,6 +111,11 @@ protected abstract IRubyObject doReportTime(ThreadContext context, protected abstract IRubyObject doDecrement(ThreadContext context, IRubyObject[] args); + @Override + public Metric asApiMetric() { + return new NamespacedMetricImpl(getRuntime().getCurrentContext(), this); + } + protected abstract IRubyObject doRegister(ThreadContext context, IRubyObject key, Block metricSupplier); public abstract AbstractMetricExt getMetric(); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractSimpleMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractSimpleMetricExt.java index 8f7f89f5d6a..6f3553f72af 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractSimpleMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractSimpleMetricExt.java @@ -20,6 +20,7 @@ package org.logstash.instrument.metrics; +import co.elastic.logstash.api.Metric; import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; @@ -27,6 +28,7 @@ import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.plugins.RootMetricImpl; @JRubyClass(name = "AbstractSimpleMetric") public abstract class AbstractSimpleMetricExt extends AbstractMetricExt { @@ -37,6 +39,11 @@ public abstract class AbstractSimpleMetricExt extends AbstractMetricExt { super(runtime, metaClass); } + @Override + public Metric asApiMetric() { + return new RootMetricImpl(getRuntime().getCurrentContext(), this); + } + @JRubyMethod(required = 2, optional = 1) public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) { return doIncrement(context, args); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/NullNamespacedMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/NullNamespacedMetricExt.java index 112b89ac5e0..21eb5c0186b 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/NullNamespacedMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/NullNamespacedMetricExt.java @@ -20,6 +20,7 @@ package org.logstash.instrument.metrics; +import co.elastic.logstash.api.Metric; import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyClass; @@ -31,6 +32,7 @@ import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.plugins.NamespacedMetricImpl; @JRubyClass(name = "NamespacedNullMetric", parent = "AbstractNamespacedMetric") public final class NullNamespacedMetricExt extends AbstractNamespacedMetricExt { @@ -69,6 +71,11 @@ public NullNamespacedMetricExt initialize(final ThreadContext context, return this; } + @Override + public Metric asApiMetric() { + return NamespacedMetricImpl.getNullMetric(); + } + @Override protected IRubyObject getCollector(final ThreadContext context) { return metric.collector(context); diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/TimerMetricFactory.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/TimerMetricFactory.java index d2cb39be9e2..04af5f5f790 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/TimerMetricFactory.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/timer/TimerMetricFactory.java @@ -5,6 +5,10 @@ public class TimerMetricFactory { static final TimerMetricFactory INSTANCE = new TimerMetricFactory(); + public static TimerMetricFactory getInstance() { + return INSTANCE; + } + private TimerMetricFactory() { } diff --git a/logstash-core/src/main/java/org/logstash/plugins/NamespacedMetricImpl.java b/logstash-core/src/main/java/org/logstash/plugins/NamespacedMetricImpl.java index 0a76ba59e3f..cc57421e08e 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/NamespacedMetricImpl.java +++ b/logstash-core/src/main/java/org/logstash/plugins/NamespacedMetricImpl.java @@ -24,13 +24,17 @@ import co.elastic.logstash.api.Metric; import co.elastic.logstash.api.NamespacedMetric; import co.elastic.logstash.api.UserMetric; +import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyObject; import org.jruby.RubySymbol; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; import org.logstash.Rubyfier; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.instrument.metrics.NullMetricExt; +import org.logstash.instrument.metrics.NullNamespacedMetricExt; import org.logstash.instrument.metrics.timer.TimerMetric; import java.util.ArrayList; @@ -45,6 +49,25 @@ */ public class NamespacedMetricImpl implements NamespacedMetric { + private static final NamespacedMetric NULL_METRIC; + static { + final Ruby rubyRuntime = RubyUtil.RUBY; + final ThreadContext context = rubyRuntime.getCurrentContext(); + final NullMetricExt nullMetricExt = NullMetricExt.create(); + final AbstractNamespacedMetricExt namespacedMetricExt = NullNamespacedMetricExt.create(nullMetricExt, rubyRuntime.newArray()); + + NULL_METRIC = new NamespacedMetricImpl(context, namespacedMetricExt){ + @Override + public NamespacedMetric namespace(String... key) { + return this; + } + }; + } + + public static NamespacedMetric getNullMetric() { + return NULL_METRIC; + } + private final ThreadContext threadContext; private final AbstractNamespacedMetricExt metrics; diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/AtomicIORatioMetricTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/AtomicIORatioMetricTest.java new file mode 100644 index 00000000000..2d209d2b1c4 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/AtomicIORatioMetricTest.java @@ -0,0 +1,129 @@ +package org.logstash.ackedqueue; + +import org.apache.logging.log4j.Logger; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.stringContainsInOrder; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; + +public class AtomicIORatioMetricTest { + @Test + public void test() { + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name"); + + assertThat(ioRatioMetric.getValue()).isNaN(); + + ioRatioMetric.incrementBy(1024, 768); + assertThat(ioRatioMetric.getValue()).isEqualTo(0.75); + + ioRatioMetric.incrementBy(256, 128); + assertThat(ioRatioMetric.getValue()).isEqualTo(0.7); + + ioRatioMetric.incrementBy(512, 128); + assertThat(ioRatioMetric.getValue()).isEqualTo(0.5714); + + ioRatioMetric.incrementBy(256, 0); + assertThat(ioRatioMetric.getValue()).isEqualTo(0.5); + + ioRatioMetric.incrementBy(0, 1024); + assertThat(ioRatioMetric.getValue()).isEqualTo(1.0); + + ioRatioMetric.reset(); + assertThat(ioRatioMetric.getLifetime()).satisfies(value -> { + assertThat(value.bytesIn()).isEqualTo(0L); + assertThat(value.bytesOut()).isEqualTo(0L); + }); + + int iterations = 100000000; + int bytesInPerIteration = 4000000; + int bytesOutPerIteration = 3000000; + for (int i = 0; i < iterations; i++) { + ioRatioMetric.incrementBy(bytesInPerIteration, bytesOutPerIteration); + } + assertThat(ioRatioMetric.getLifetime()).satisfies(value -> { + assertThat(value.bytesIn()).isEqualTo(Math.multiplyExact((long)bytesInPerIteration, iterations)); + assertThat(value.bytesOut()).isEqualTo(Math.multiplyExact((long)bytesOutPerIteration, iterations)); + }); + assertThat(ioRatioMetric.getValue()).isEqualTo(0.75); + } + + @Test + public void testZeroBytesInPositiveBytesOut() { + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name"); + + ioRatioMetric.incrementBy(0, 768); + assertThat(ioRatioMetric.getValue()).isEqualTo(Double.POSITIVE_INFINITY); + } + + @Test + public void testNegativeBytesIn() { + final Logger mockLogger = Mockito.mock(Logger.class); + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name", mockLogger); + ioRatioMetric.incrementBy(-1, 768); + + Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("cannot decrement")), eq(ioRatioMetric.getName())); + } + + @Test + public void testNegativeBytesOut() { + final Logger mockLogger = Mockito.mock(Logger.class); + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name", mockLogger); + ioRatioMetric.incrementBy(768, -1); + + Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("cannot decrement")), eq(ioRatioMetric.getName())); + } + + @Test + public void testZeroBytesInZeroBytesOut() { + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name"); + + ioRatioMetric.incrementBy(0, 0); + assertThat(ioRatioMetric.getValue()).isNaN(); + } + + @Test + public void testLongBytesInOverflow() { + final Logger mockLogger = Mockito.mock(Logger.class); + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name", mockLogger); + ioRatioMetric.setTo(Long.MAX_VALUE, 2L); + + assertThat(ioRatioMetric.getValue()).isEqualTo(2.168E-19); + assertThat(ioRatioMetric.getLifetime()).satisfies(value -> { + assertThat(value.bytesIn()).isEqualTo(Long.MAX_VALUE); + assertThat(value.bytesOut()).isEqualTo(2L); + }); + + //overflow reset + ioRatioMetric.incrementBy(1, 10); + assertThat(ioRatioMetric.getLifetime()).satisfies(value -> { + assertThat(value.bytesIn()).isEqualTo(4611686018427387903L + 1L); + assertThat(value.bytesOut()).isEqualTo(1L + 10L); + }); + Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("long overflow", "precision", "reduced"))); + } + + @Test + public void testLongBytesOutOverflow() { + final Logger mockLogger = Mockito.mock(Logger.class); + final AtomicIORatioMetric ioRatioMetric = new AtomicIORatioMetric("name", mockLogger); + ioRatioMetric.setTo(2L, Long.MAX_VALUE); + + assertThat(ioRatioMetric.getValue()).isEqualTo(4.612E18); + assertThat(ioRatioMetric.getLifetime()).satisfies(value -> { + assertThat(value.bytesIn()).isEqualTo(2L); + assertThat(value.bytesOut()).isEqualTo(Long.MAX_VALUE); + }); + + //overflow reset/truncate + ioRatioMetric.incrementBy(10, 1); + assertThat(ioRatioMetric.getLifetime()).satisfies(value -> { + assertThat(value.bytesIn()).isEqualTo(1L + 10L); + assertThat(value.bytesOut()).isEqualTo(4611686018427387903L + 1L); + }); + Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("long overflow", "precision", "reduced"))); + } + +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/CalculatedRelativeSpendMetricTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/CalculatedRelativeSpendMetricTest.java new file mode 100644 index 00000000000..fe5067e7b39 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/CalculatedRelativeSpendMetricTest.java @@ -0,0 +1,47 @@ +package org.logstash.ackedqueue; + +import org.junit.Test; +import org.logstash.instrument.metrics.ManualAdvanceClock; +import org.logstash.instrument.metrics.UptimeMetric; +import org.logstash.instrument.metrics.timer.TestTimerMetricFactory; +import org.logstash.instrument.metrics.timer.TimerMetric; + +import java.time.Duration; +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.logstash.instrument.metrics.VisibilityUtil.createUptimeMetric; + +public class CalculatedRelativeSpendMetricTest { + @Test + public void testCalculateRelativeSpendMetric() { + final ManualAdvanceClock clock = new ManualAdvanceClock(Instant.now()); + final TimerMetric burnMetric = new TestTimerMetricFactory(clock::nanoTime).newTimerMetric("burn"); + final UptimeMetric uptimeMetric = createUptimeMetric("wall", clock::nanoTime); + final CalculatedRelativeSpendMetric relativeSpendMetric = new CalculatedRelativeSpendMetric("spend", burnMetric, uptimeMetric); + + clock.advance(Duration.ofMillis(17)); + assertThat(relativeSpendMetric.getValue()).isEqualTo(0.0); + + relativeSpendMetric.time(() -> clock.advance(Duration.ofMillis(17))); + assertThat(relativeSpendMetric.getValue()).isEqualTo(0.5); + + clock.advance(Duration.ofMillis(34)); + assertThat(relativeSpendMetric.getValue()).isEqualTo(0.25); + + relativeSpendMetric.time(() -> clock.advance(Duration.ofMillis(147))); + assertThat(relativeSpendMetric.getValue()).isEqualTo(0.7628); + + // nesting for forced concurrency; interleaving validated upstream in TimerMetric + relativeSpendMetric.time(() -> { + clock.advance(Duration.ofMillis(149)); + relativeSpendMetric.time(() -> clock.advance(Duration.ofMillis(842))); + clock.advance(Duration.ofMillis(17)); + }); + assertThat(relativeSpendMetric.getValue()).isEqualTo(1.647); + + // advance wall clock without any new timings + clock.advance(Duration.ofMillis(6833)); + assertThat(relativeSpendMetric.getValue()).isEqualTo(0.25); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java index d1a154bd3d3..3035507f957 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java @@ -23,15 +23,15 @@ public class CompressionCodecTest { static final ImmutableByteArrayBarrier COMPRESSED_DEFAULT = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), 3)); static final ImmutableByteArrayBarrier COMPRESSED_MAXIMUM = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), 22)); - private final CompressionCodec codecDisabled = CompressionCodec.fromConfigValue("disabled"); - private final CompressionCodec codecNone = CompressionCodec.fromConfigValue("none"); - private final CompressionCodec codecSpeed = CompressionCodec.fromConfigValue("speed"); - private final CompressionCodec codecBalanced = CompressionCodec.fromConfigValue("balanced"); - private final CompressionCodec codecSize = CompressionCodec.fromConfigValue("size"); + private final CompressionCodec codecDisabled = CompressionCodec.fromConfigValue("disabled").create(); + private final CompressionCodec codecNone = CompressionCodec.fromConfigValue("none").create(); + private final CompressionCodec codecSpeed = CompressionCodec.fromConfigValue("speed").create(); + private final CompressionCodec codecBalanced = CompressionCodec.fromConfigValue("balanced").create(); + private final CompressionCodec codecSize = CompressionCodec.fromConfigValue("size").create(); @Test public void testDisabledCompressionCodecDecodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled").create(); assertDecodesRaw(compressionCodec); // ensure true pass-through when compression is disabled, even if the payload looks like ZSTD @@ -42,7 +42,7 @@ public void testDisabledCompressionCodecDecodes() throws Exception { @Test public void testDisabledCompressionCodecEncodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled").create(); // ensure true pass-through when compression is disabled assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes()))); } @@ -50,13 +50,13 @@ public void testDisabledCompressionCodecEncodes() throws Exception { @Test public void testDisabledCompressionCodecLogging() throws Exception { final Logger mockLogger = Mockito.mock(Logger.class); - CompressionCodec.fromConfigValue("disabled", mockLogger); + CompressionCodec.fromConfigValue("disabled", mockLogger).create(); Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("compression support", "disabled"))); } @Test public void testNoneCompressionCodecDecodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none").create(); assertDecodesRaw(compressionCodec); assertDecodesDeflateAnyLevel(compressionCodec); assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); @@ -64,20 +64,20 @@ public void testNoneCompressionCodecDecodes() throws Exception { @Test public void testNoneCompressionCodecEncodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none").create(); assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes()))); } @Test public void testNoneCompressionCodecLogging() throws Exception { final Logger mockLogger = Mockito.mock(Logger.class); - CompressionCodec.fromConfigValue("none", mockLogger); + CompressionCodec.fromConfigValue("none", mockLogger).create(); Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "read-only"))); } @Test public void testSpeedCompressionCodecDecodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed").create(); assertDecodesRaw(compressionCodec); assertDecodesDeflateAnyLevel(compressionCodec); assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); @@ -85,20 +85,20 @@ public void testSpeedCompressionCodecDecodes() throws Exception { @Test public void testSpeedCompressionCodecEncodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed").create(); assertEncodesSmallerRoundTrip(compressionCodec); } @Test public void testSpeedCompressionCodecLogging() throws Exception { final Logger mockLogger = Mockito.mock(Logger.class); - CompressionCodec.fromConfigValue("speed", mockLogger); + CompressionCodec.fromConfigValue("speed", mockLogger).create(); Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "speed"))); } @Test public void testBalancedCompressionCodecDecodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced").create(); assertDecodesRaw(compressionCodec); assertDecodesDeflateAnyLevel(compressionCodec); assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); @@ -106,20 +106,20 @@ public void testBalancedCompressionCodecDecodes() throws Exception { @Test public void testBalancedCompressionCodecEncodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced").create(); assertEncodesSmallerRoundTrip(compressionCodec); } @Test public void testBalancedCompressionCodecLogging() throws Exception { final Logger mockLogger = Mockito.mock(Logger.class); - CompressionCodec.fromConfigValue("balanced", mockLogger); + CompressionCodec.fromConfigValue("balanced", mockLogger).create(); Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "balanced"))); } @Test public void testSizeCompressionCodecDecodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size").create(); assertDecodesRaw(compressionCodec); assertDecodesDeflateAnyLevel(compressionCodec); assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec); @@ -127,14 +127,14 @@ public void testSizeCompressionCodecDecodes() throws Exception { @Test public void testSizeCompressionCodecEncodes() throws Exception { - final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size"); + final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size").create(); assertEncodesSmallerRoundTrip(compressionCodec); } @Test public void testSizeCompressionCodecLogging() throws Exception { final Logger mockLogger = Mockito.mock(Logger.class); - CompressionCodec.fromConfigValue("size", mockLogger); + CompressionCodec.fromConfigValue("size", mockLogger).create(); Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "size"))); } diff --git a/logstash-core/src/test/java/org/logstash/instrument/metrics/VisibilityUtil.java b/logstash-core/src/test/java/org/logstash/instrument/metrics/VisibilityUtil.java new file mode 100644 index 00000000000..f252fae7766 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/instrument/metrics/VisibilityUtil.java @@ -0,0 +1,13 @@ +package org.logstash.instrument.metrics; + +import org.logstash.instrument.metrics.timer.TimerMetric; + +import java.util.function.LongSupplier; + +public class VisibilityUtil { + private VisibilityUtil() {} + + public static UptimeMetric createUptimeMetric(String name, LongSupplier nanoTimeSupplier) { + return new UptimeMetric(name, nanoTimeSupplier); + } +} diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index 4c3e43a8245..d7da245552d 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -215,6 +215,11 @@ logstash_service.start_with_stdin logstash_service.wait_for_logstash + number_of_events.times { + logstash_service.write_to_stdin("Testing flow metrics") + sleep(1) + } + Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do # node_stats can fail if the stats subsystem isn't ready result = logstash_service.monitoring_api.node_stats rescue nil @@ -234,6 +239,9 @@ expect(queue_capacity_stats["page_capacity_in_bytes"]).not_to be_nil expect(queue_capacity_stats["max_queue_size_in_bytes"]).not_to be_nil expect(queue_capacity_stats["max_unread_events"]).not_to be_nil + queue_compression_stats = queue_stats.fetch("compression") + expect(queue_compression_stats.dig('decode', 'ratio', 'lifetime')).to be >= 1 + expect(queue_compression_stats.dig('decode', 'spend', 'lifetime')).not_to be_nil else expect(queue_stats["type"]).to eq("memory") end