diff --git a/config/logstash.yml b/config/logstash.yml
index e4d008ca5cd..4a5be0cb23b 100644
--- a/config/logstash.yml
+++ b/config/logstash.yml
@@ -223,6 +223,10 @@
#
# queue.checkpoint.writes: 1024
#
+# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`.
+# The default `none` is able to decompress previously-written events, even if they were compressed.
+#
+# queue.compression: none
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go
index 95fc569b236..d1e976bbab1 100644
--- a/docker/data/logstash/env2yaml/env2yaml.go
+++ b/docker/data/logstash/env2yaml/env2yaml.go
@@ -58,6 +58,7 @@ var validSettings = []string{
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval", // remove it for #17155
+ "queue.compression",
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md
index 7bcce3c853a..419e1e3f5ae 100644
--- a/docs/reference/logstash-settings-file.md
+++ b/docs/reference/logstash-settings-file.md
@@ -68,6 +68,7 @@ The `logstash.yml` file includes these settings.
| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
+| `queue.compression` {applies_to}`stack: ga 9.2` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
| `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` |
| `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` |
| `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` |
diff --git a/docs/reference/persistent-queues.md b/docs/reference/persistent-queues.md
index 8c2bed6639d..b10c1179817 100644
--- a/docs/reference/persistent-queues.md
+++ b/docs/reference/persistent-queues.md
@@ -84,6 +84,17 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref
`queue.checkpoint.interval` {applies_to}`stack: deprecated 9.1`
: Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints.
+`queue.compression` {applies_to}`stack: ga 9.2`
+: Sets the event compression level for use with the Persisted Queue. Default is `none`. Possible values are:
+ * `none`: does not perform compression, but reads compressed events
+ * `speed`: optimize for fastest compression operation
+ * `size`: optimize for smallest possible size on disk, spending more CPU
+ * `balanced`: a balance between the `speed` and `size` settings
+:::{important}
+Compression can be enabled for an existing PQ, but once compressed elements have been added to a PQ, that PQ cannot be read by previous Logstash releases that did not support compression.
+If you need to downgrade Logstash after enabling the PQ, you will need to either delete the PQ or run the pipeline with `queue.drain: true` first to ensure that no compressed elements remain.
+:::
+
## Configuration notes [pq-config-notes]
Every situation and environment is different, and the "ideal" configuration varies. If you optimize for performance, you may increase your risk of losing data. If you optimize for data protection, you may impact performance.
diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle
index 5c0db0f5a3f..a45e168f008 100644
--- a/logstash-core/build.gradle
+++ b/logstash-core/build.gradle
@@ -239,6 +239,7 @@ dependencies {
implementation 'commons-codec:commons-codec:1.17.0' // transitively required by httpclient
// Jackson version moved to versions.yml in the project root (the JrJackson version is there too)
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
+ implementation "com.github.luben:zstd-jni:1.5.7-4"
api "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}"
api "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
implementation 'org.codehaus.janino:janino:3.1.0'
diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb
index 49978fad31a..2874c3d6e70 100644
--- a/logstash-core/lib/logstash/environment.rb
+++ b/logstash-core/lib/logstash/environment.rb
@@ -95,6 +95,7 @@ def self.as_java_range(r)
Setting::NumericSetting.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::NumericSetting.new("queue.checkpoint.interval", 1000), # remove it for #17155
Setting::BooleanSetting.new("queue.checkpoint.retry", true),
+ Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
Setting::BooleanSetting.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000),
diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb
index e90d5523317..78b63810a7a 100644
--- a/logstash-core/lib/logstash/settings.rb
+++ b/logstash-core/lib/logstash/settings.rb
@@ -69,6 +69,7 @@ def self.included(base)
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
+ "queue.compression",
"queue.drain",
"queue.max_bytes",
"queue.max_events",
diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb
index 540113412c4..5d8dd24a229 100644
--- a/logstash-core/spec/logstash/queue_factory_spec.rb
+++ b/logstash-core/spec/logstash/queue_factory_spec.rb
@@ -30,6 +30,7 @@
LogStash::Setting::NumericSetting.new("queue.checkpoint.acks", 1024),
LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024),
LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false),
+ LogStash::Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java
new file mode 100644
index 00000000000..1cb54c98a77
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java
@@ -0,0 +1,41 @@
+package org.logstash.ackedqueue;
+
+import com.github.luben.zstd.Zstd;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Subclasses of {@link AbstractZstdAwareCompressionCodec} are {@link CompressionCodec}s that are capable
+ * of detecting and decompressing deflate-compressed events. When decoding byte sequences that are NOT
+ * deflate-compressed, the given bytes are emitted verbatim.
+ */
+abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec {
+ // log from the concrete class
+ protected final Logger logger = LogManager.getLogger(this.getClass());
+
+ @Override
+ public byte[] decode(byte[] data) {
+ if (!isZstd(data)) {
+ return data;
+ }
+ try {
+ final byte[] decoded = Zstd.decompress(data);
+ logger.trace("decoded {} -> {}", data.length, decoded.length);
+ return decoded;
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while decoding", e);
+ }
+ }
+
+ private static final byte[] ZSTD_FRAME_MAGIC = { (byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD };
+
+ static boolean isZstd(byte[] data) {
+ if (data.length < 4) { return false; }
+
+ for (int i = 0; i < 4; i++) {
+ if (data[i] != ZSTD_FRAME_MAGIC[i]) { return false; }
+ }
+
+ return true;
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java
new file mode 100644
index 00000000000..b1f99cf9980
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java
@@ -0,0 +1,58 @@
+package org.logstash.ackedqueue;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public interface CompressionCodec {
+ Logger LOGGER = LogManager.getLogger(CompressionCodec.class);
+
+ byte[] encode(byte[] data);
+ byte[] decode(byte[] data);
+
+ /**
+ * The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that
+ * does nothing when encoding and decoding. It is only meant to be activated
+ * as a safety-latch in the event of compression being broken.
+ */
+ CompressionCodec NOOP = new CompressionCodec() {
+ @Override
+ public byte[] encode(byte[] data) {
+ return data;
+ }
+
+ @Override
+ public byte[] decode(byte[] data) {
+ return data;
+ }
+ };
+
+ static CompressionCodec fromConfigValue(final String configValue) {
+ return fromConfigValue(configValue, LOGGER);
+ }
+
+ static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
+ return switch (configValue) {
+ case "disabled" -> {
+ logger.warn("compression support has been disabled");
+ yield CompressionCodec.NOOP;
+ }
+ case "none" -> {
+ logger.info("compression support is enabled (read-only)");
+ yield ZstdAwareCompressionCodec.getInstance();
+ }
+ case "speed" -> {
+ logger.info("compression support is enabled (goal: speed)");
+ yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED);
+ }
+ case "balanced" -> {
+ logger.info("compression support is enabled (goal: balanced)");
+ yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED);
+ }
+ case "size" -> {
+ logger.info("compression support is enabled (goal: size)");
+ yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE);
+ }
+ default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
+ };
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
index 691987793c5..ceace485888 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
@@ -83,6 +83,7 @@ public final class Queue implements Closeable {
// deserialization
private final Class extends Queueable> elementClass;
private final Method deserializeMethod;
+ private final CompressionCodec compressionCodec;
// thread safety
private final ReentrantLock lock = new ReentrantLock();
@@ -112,6 +113,7 @@ public Queue(Settings settings) {
this.maxBytes = settings.getQueueMaxBytes();
this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry());
this.elementClass = settings.getElementClass();
+ this.compressionCodec = settings.getCompressionCodec();
this.tailPages = new ArrayList<>();
this.unreadTailPages = new ArrayList<>();
this.closed = new AtomicBoolean(true); // not yet opened
@@ -414,7 +416,8 @@ public long write(Queueable element) throws IOException {
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE);
}
- byte[] data = element.serialize();
+ byte[] serializedBytes = element.serialize();
+ byte[] data = compressionCodec.encode(serializedBytes);
// the write strategy with regard to the isFull() state is to assume there is space for this element
// and write it, then after write verify if we just filled the queue and wait on the notFull condition
@@ -767,7 +770,8 @@ public CheckpointIO getCheckpointIO() {
*/
public Queueable deserialize(byte[] bytes) {
try {
- return (Queueable)this.deserializeMethod.invoke(this.elementClass, bytes);
+ byte[] decodedBytes = compressionCodec.decode(bytes);
+ return (Queueable)this.deserializeMethod.invoke(this.elementClass, decodedBytes);
} catch (IllegalAccessException|InvocationTargetException e) {
throw new QueueRuntimeException("deserialize invocation error", e);
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
index 6a10c2a3e7f..f437c1898ba 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
@@ -24,6 +24,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
@@ -63,6 +66,8 @@ public final class QueueFactoryExt extends RubyBasicObject {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger(QueueFactoryExt.class);
+
public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@@ -123,6 +128,13 @@ private static Settings extractQueueSettings(final IRubyObject settings) {
.checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
.checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
+ .compressionCodec(extractConfiguredCodec(settings))
.build();
}
+
+ private static CompressionCodec extractConfiguredCodec(final IRubyObject settings) {
+ final ThreadContext context = settings.getRuntime().getCurrentContext();
+ final String compressionSetting = getSetting(context, settings, QUEUE_COMPRESSION).asJavaString();
+ return CompressionCodec.fromConfigValue(compressionSetting, LOGGER);
+ }
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
index 1623738659e..9c404ef1fc5 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
@@ -44,6 +44,8 @@ public interface Settings {
boolean getCheckpointRetry();
+ CompressionCodec getCompressionCodec();
+
/**
* Validate and return the settings, or throw descriptive {@link QueueRuntimeException}
* @param settings the settings to validate
@@ -89,6 +91,8 @@ interface Builder {
Builder checkpointRetry(boolean checkpointRetry);
+ Builder compressionCodec(CompressionCodec compressionCodec);
+
Settings build();
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
index bc191f44a32..923217af366 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
@@ -31,6 +31,7 @@ public class SettingsImpl implements Settings {
private final int checkpointMaxAcks;
private final int checkpointMaxWrites;
private final boolean checkpointRetry;
+ private final CompressionCodec compressionCodec;
public static Builder builder(final Settings settings) {
return new BuilderImpl(settings);
@@ -49,6 +50,7 @@ private SettingsImpl(final BuilderImpl builder) {
this.checkpointMaxAcks = builder.checkpointMaxAcks;
this.checkpointMaxWrites = builder.checkpointMaxWrites;
this.checkpointRetry = builder.checkpointRetry;
+ this.compressionCodec = builder.compressionCodec;
}
@Override
@@ -91,6 +93,11 @@ public boolean getCheckpointRetry() {
return this.checkpointRetry;
}
+ @Override
+ public CompressionCodec getCompressionCodec() {
+ return this.compressionCodec;
+ }
+
/**
* Default implementation for Setting's Builder
* */
@@ -140,6 +147,8 @@ private static final class BuilderImpl implements Builder {
private boolean checkpointRetry;
+ private CompressionCodec compressionCodec;
+
private BuilderImpl(final String dirForFiles) {
this.dirForFiles = dirForFiles;
this.elementClass = null;
@@ -148,6 +157,7 @@ private BuilderImpl(final String dirForFiles) {
this.maxUnread = DEFAULT_MAX_UNREAD;
this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS;
this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES;
+ this.compressionCodec = CompressionCodec.NOOP;
this.checkpointRetry = false;
}
@@ -160,6 +170,7 @@ private BuilderImpl(final Settings settings) {
this.checkpointMaxAcks = settings.getCheckpointMaxAcks();
this.checkpointMaxWrites = settings.getCheckpointMaxWrites();
this.checkpointRetry = settings.getCheckpointRetry();
+ this.compressionCodec = settings.getCompressionCodec();
}
@Override
@@ -204,6 +215,12 @@ public Builder checkpointRetry(final boolean checkpointRetry) {
return this;
}
+ @Override
+ public Builder compressionCodec(CompressionCodec compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ return this;
+ }
+
@Override
public Settings build() {
return Settings.ensureValid(new SettingsImpl(this));
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java
new file mode 100644
index 00000000000..f82b4b75f2e
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdAwareCompressionCodec.java
@@ -0,0 +1,18 @@
+package org.logstash.ackedqueue;
+
+/**
+ * A {@link ZstdAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed
+ * bytes, but performs no compression when encoding.
+ */
+class ZstdAwareCompressionCodec extends AbstractZstdAwareCompressionCodec {
+ private static final ZstdAwareCompressionCodec INSTANCE = new ZstdAwareCompressionCodec();
+
+ static ZstdAwareCompressionCodec getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] encode(byte[] data) {
+ return data;
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java
new file mode 100644
index 00000000000..fa5a22b3ee6
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ZstdEnabledCompressionCodec.java
@@ -0,0 +1,41 @@
+package org.logstash.ackedqueue;
+
+import com.github.luben.zstd.Zstd;
+
+/**
+ * A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed
+ * bytes and performs deflate compression when encoding.
+ */
+class ZstdEnabledCompressionCodec extends AbstractZstdAwareCompressionCodec implements CompressionCodec {
+ public enum Goal {
+ FASTEST(-7),
+ SPEED(-1),
+ BALANCED(3),
+ HIGH(14),
+ SIZE(22),
+ ;
+
+ private int internalLevel;
+
+ Goal(final int internalLevel) {
+ this.internalLevel = internalLevel;
+ }
+ }
+
+ private final int internalLevel;
+
+ ZstdEnabledCompressionCodec(final Goal internalLevel) {
+ this.internalLevel = internalLevel.internalLevel;
+ }
+
+ @Override
+ public byte[] encode(byte[] data) {
+ try {
+ final byte[] encoded = Zstd.compress(data, internalLevel);
+ logger.trace("encoded {} -> {}", data.length, encoded.length);
+ return encoded;
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while encoding", e);
+ }
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
index 7c5c47e3162..be113bd0a2f 100644
--- a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
+++ b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
@@ -43,4 +43,6 @@ public class SettingKeyDefinitions {
public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry";
public static final String QUEUE_MAX_BYTES = "queue.max_bytes";
+
+ public static final String QUEUE_COMPRESSION = "queue.compression";
}
\ No newline at end of file
diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java
new file mode 100644
index 00000000000..d1a154bd3d3
--- /dev/null
+++ b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java
@@ -0,0 +1,239 @@
+package org.logstash.ackedqueue;
+
+import com.github.luben.zstd.Zstd;
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.security.SecureRandom;
+import java.util.Arrays;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Matchers.argThat;
+
+public class CompressionCodecTest {
+ static final ImmutableByteArrayBarrier RAW_BYTES = new ImmutableByteArrayBarrier((
+ "this is a string of text with repeated substrings that is designed to be "+
+ "able to be compressed into a string that is smaller than the original input "+
+ "so that we can assert that the compression codecs compress strings to be "+
+ "smaller than their uncompressed representations").getBytes());
+ static final ImmutableByteArrayBarrier COMPRESSED_MINIMAL = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), -1));
+ static final ImmutableByteArrayBarrier COMPRESSED_DEFAULT = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), 3));
+ static final ImmutableByteArrayBarrier COMPRESSED_MAXIMUM = new ImmutableByteArrayBarrier(compress(RAW_BYTES.bytes(), 22));
+
+ private final CompressionCodec codecDisabled = CompressionCodec.fromConfigValue("disabled");
+ private final CompressionCodec codecNone = CompressionCodec.fromConfigValue("none");
+ private final CompressionCodec codecSpeed = CompressionCodec.fromConfigValue("speed");
+ private final CompressionCodec codecBalanced = CompressionCodec.fromConfigValue("balanced");
+ private final CompressionCodec codecSize = CompressionCodec.fromConfigValue("size");
+
+ @Test
+ public void testDisabledCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled");
+ assertDecodesRaw(compressionCodec);
+
+ // ensure true pass-through when compression is disabled, even if the payload looks like ZSTD
+ assertThat(compressionCodec.decode(COMPRESSED_MINIMAL.bytes()), is(equalTo(COMPRESSED_MINIMAL.bytes())));
+ assertThat(compressionCodec.decode(COMPRESSED_DEFAULT.bytes()), is(equalTo(COMPRESSED_DEFAULT.bytes())));
+ assertThat(compressionCodec.decode(COMPRESSED_MAXIMUM.bytes()), is(equalTo(COMPRESSED_MAXIMUM.bytes())));
+ }
+
+ @Test
+ public void testDisabledCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled");
+ // ensure true pass-through when compression is disabled
+ assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ @Test
+ public void testDisabledCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("disabled", mockLogger);
+ Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("compression support", "disabled")));
+ }
+
+ @Test
+ public void testNoneCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testNoneCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none");
+ assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ @Test
+ public void testNoneCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("none", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "read-only")));
+ }
+
+ @Test
+ public void testSpeedCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testSpeedCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed");
+ assertEncodesSmallerRoundTrip(compressionCodec);
+ }
+
+ @Test
+ public void testSpeedCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("speed", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "speed")));
+ }
+
+ @Test
+ public void testBalancedCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testBalancedCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced");
+ assertEncodesSmallerRoundTrip(compressionCodec);
+ }
+
+ @Test
+ public void testBalancedCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("balanced", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "balanced")));
+ }
+
+ @Test
+ public void testSizeCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testSizeCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size");
+ assertEncodesSmallerRoundTrip(compressionCodec);
+ }
+
+ @Test
+ public void testSizeCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("size", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "size")));
+ }
+
+ @Test(timeout=1000)
+ public void testCompressionCodecDecodeTailTruncated() throws Exception {
+ final byte[] truncatedInput = copyWithTruncatedTail(COMPRESSED_DEFAULT.bytes(), 32);
+
+ final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(truncatedInput));
+ assertThat(thrownException.getMessage(), containsString("Exception while decoding"));
+ final Throwable rootCause = extractRootCause(thrownException);
+ assertThat(rootCause.getMessage(), containsString("Data corruption detected"));
+ }
+
+ byte[] copyWithTruncatedTail(final byte[] input, final int tailSize) {
+ int startIndex = (input.length < tailSize) ? 0 : input.length - tailSize;
+
+ final byte[] result = Arrays.copyOf(input, input.length);
+ Arrays.fill(result, startIndex, result.length, (byte) 0);
+
+ return result;
+ }
+
+ @Test(timeout=1000)
+ public void testCompressionCodecDecodeTailScrambled() throws Exception {
+ final byte[] scrambledInput = copyWithScrambledTail(COMPRESSED_DEFAULT.bytes(), 32);
+
+ final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(scrambledInput));
+ assertThat(thrownException.getMessage(), containsString("Exception while decoding"));
+ final Throwable rootCause = extractRootCause(thrownException);
+ assertThat(rootCause.getMessage(), anyOf(containsString("Data corruption detected"), containsString("Destination buffer is too small")));
+ }
+
+ byte[] copyWithScrambledTail(final byte[] input, final int tailSize) {
+ final SecureRandom secureRandom = new SecureRandom();
+ int startIndex = (input.length < tailSize) ? 0 : input.length - tailSize;
+
+ byte[] randomBytes = new byte[input.length - startIndex];
+ secureRandom.nextBytes(randomBytes);
+
+ final byte[] result = Arrays.copyOf(input, input.length);
+ System.arraycopy(randomBytes, 0, result, startIndex, randomBytes.length);
+
+ return result;
+ }
+
+ @Test(timeout=1000)
+ public void testCompressionDecodeTailNullPadded() throws Exception {
+ final byte[] nullPaddedInput = copyWithNullPaddedTail(COMPRESSED_DEFAULT.bytes(), 32);
+
+ final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(nullPaddedInput));
+ assertThat(thrownException.getMessage(), containsString("Exception while decoding"));
+ final Throwable rootCause = extractRootCause(thrownException);
+ assertThat(rootCause.getMessage(), anyOf(containsString("Unknown frame descriptor"), containsString("Data corruption detected")));
+ }
+
+ byte[] copyWithNullPaddedTail(final byte[] input, final int tailSize) {
+ return Arrays.copyOf(input, Math.addExact(input.length, tailSize));
+ }
+
+ Throwable extractRootCause(final Throwable throwable) {
+ Throwable current;
+ Throwable cause = throwable;
+ do {
+ current = cause;
+ cause = current.getCause();
+ } while (cause != null && cause != current);
+ return current;
+ }
+
+ void assertDecodesRaw(final CompressionCodec codec) {
+ assertThat(codec.decode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ void assertDecodesDeflateAnyLevel(final CompressionCodec codec) {
+ // zstd levels range from -7 to 22.
+ for (int level = -7; level < 22; level++) {
+ final byte[] compressed = compress(RAW_BYTES.bytes(), level);
+ assertThat(String.format("zstd level %s (%s bytes)", level, compressed.length), codec.decode(compressed), is(equalTo(RAW_BYTES.bytes())));
+ }
+ }
+
+ void assertDecodesOutputOfAllKnownCompressionCodecs(final CompressionCodec codec) {
+ assertThat(codec.decode(codecDisabled.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecNone.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecSpeed.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecBalanced.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecSize.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ void assertEncodesSmallerRoundTrip(final CompressionCodec codec) {
+ final byte[] input = RAW_BYTES.bytes();
+
+ final byte[] encoded = codec.encode(input);
+ assertThat("encoded is smaller", encoded.length, is(lessThan(input.length)));
+ assertThat("shaped like zstd", AbstractZstdAwareCompressionCodec.isZstd(encoded), is(true));
+ assertThat("round trip decode", codec.decode(encoded), is(equalTo(input)));
+ }
+
+ public static byte[] compress(byte[] input, int level) {
+ return Zstd.compress(input, level);
+ }
+}
\ No newline at end of file
diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java b/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java
new file mode 100644
index 00000000000..276e4d97dba
--- /dev/null
+++ b/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java
@@ -0,0 +1,20 @@
+package org.logstash.ackedqueue;
+
+import java.util.Arrays;
+
+/**
+ * An {@link ImmutableByteArrayBarrier} provides an immutability shield around a {@code byte[]}.
+ * It stores an inaccessible copy of the provided {@code byte[]}, and makes copies of that copy
+ * available via {@link ImmutableByteArrayBarrier#bytes}.
+ * @param bytes the byte array
+ */
+public record ImmutableByteArrayBarrier(byte[] bytes) {
+ public ImmutableByteArrayBarrier(byte[] bytes) {
+ this.bytes = Arrays.copyOf(bytes, bytes.length);
+ }
+
+ @Override
+ public byte[] bytes() {
+ return Arrays.copyOf(bytes, bytes.length);
+ }
+}
diff --git a/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md
new file mode 100644
index 00000000000..6daecabb144
--- /dev/null
+++ b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md
@@ -0,0 +1,141 @@
+# Summary
+
+The logstash data directory contains a queue for pipeline `main` containing:
+
+ - ACK'd events (from a page that is not fully-ack'd)
+ - raw CBOR-encoded events
+ - zstd-compressed events with different compression goals
+
+# Pages
+~~~
+1 258 821AACAC page.0 CBOR(stringref)
+2 343 3BE717E8 page.0 CBOR(stringref)
+3 332 3439807A page.0 CBOR(stringref)
+4 258 C04209D4 page.0 CBOR(stringref)
+5 343 3DFB08E8 page.0 CBOR(stringref)
+6 332 44B0315D page.0 CBOR(stringref)
+7 258 90D25985 page.0 CBOR(stringref)
+8 343 DAFD5712 page.0 CBOR(stringref)
+9 332 AB6A81DF page.0 CBOR(stringref)
+10 258 157EA7A6 page.0 CBOR(stringref)
+11 258 02C0F7A2 page.0 CBOR(stringref)
+12 343 0005E8A8 page.0 CBOR(stringref)
+13 332 C2DA39EA page.1 CBOR(stringref)
+14 258 377D623C page.1 CBOR(stringref)
+15 343 9F76657C page.1 CBOR(stringref)
+16 332 50B51A98 page.1 CBOR(stringref)
+17 258 827848CC page.1 CBOR(stringref)
+18 343 8325D121 page.1 CBOR(stringref)
+19 332 E1A1378B page.1 CBOR(stringref)
+20 258 1BBDAA1A page.1 CBOR(stringref)
+21 254 19C85DF6 page.1 ZSTD(258)
+22 317 AD5DC7CC page.1 ZSTD(343)
+23 325 BB8CE48C page.1 ZSTD(332)
+24 254 27D38856 page.1 ZSTD(258)
+25 317 67A7D2F3 page.2 ZSTD(343)
+26 325 888AF9B2 page.2 ZSTD(332)
+27 254 CAA2FDE3 page.2 ZSTD(258)
+28 317 2985771A page.2 ZSTD(343)
+29 325 89197F51 page.2 ZSTD(332)
+30 254 A9E292EE page.2 ZSTD(258)
+31 258 243FC2C1 page.2 CBOR(stringref)
+32 219 2E2E0BDF page.2 ZSTD(258)
+33 261 5ED17F40 page.2 ZSTD(343)
+34 280 86BA1E80 page.2 ZSTD(332)
+35 218 6A7B8C41 page.2 ZSTD(258)
+36 262 08E69C4C page.2 ZSTD(343)
+37 277 CD32DEBD page.2 ZSTD(332)
+38 218 43101D61 page.2 ZSTD(258)
+39 261 A22033DE page.3 ZSTD(343)
+40 279 8F1FE0FA page.3 ZSTD(332)
+41 218 FF56D05C page.3 ZSTD(258)
+42 258 7077981D page.3 CBOR(stringref)
+43 343 7748A127 page.3 CBOR(stringref)
+44 332 B4A0C82C page.3 CBOR(stringref)
+45 258 96FB0308 page.3 CBOR(stringref)
+46 343 40B77975 page.3 CBOR(stringref)
+47 332 D5571FDC page.3 CBOR(stringref)
+48 258 BF3FC517 page.3 CBOR(stringref)
+49 343 1BC62146 page.3 CBOR(stringref)
+50 332 418FD829 page.3 CBOR(stringref)
+51 258 DB40747E page.3 CBOR(stringref)
+52 224 7629AF30 page.4 ZSTD(258)
+53 264 D450FC21 page.4 ZSTD(343)
+54 284 43F91F18 page.4 ZSTD(332)
+55 224 C61DB7BA page.4 ZSTD(258)
+56 264 F9547DBC page.4 ZSTD(343)
+57 281 3DBB71E5 page.4 ZSTD(332)
+58 225 8ACDB484 page.4 ZSTD(258)
+59 264 8256E2D2 page.4 ZSTD(343)
+60 281 D76156A2 page.4 ZSTD(332)
+61 225 EDC6147B page.4 ZSTD(258)
+62 258 D3AB1EF4 page.4 CBOR(stringref)
+63 220 4851D677 page.4 ZSTD(258)
+64 225 C8DCE54A page.4 ZSTD(258)
+65 251 3D1E0F5F page.4 ZSTD(258)
+66 258 1C5637CB page.4 CBOR(stringref)
+67 343 09AE6714 page.5 CBOR(stringref)
+68 332 4A97AC77 page.5 CBOR(stringref)
+69 254 D1E43C69 page.5 ZSTD(258)
+70 317 B6A2361D page.5 ZSTD(343)
+71 325 A44CE35F page.5 ZSTD(332)
+72 225 B69C7923 page.5 ZSTD(258)
+73 265 FEBC2D45 page.5 ZSTD(343)
+74 286 5FA5C389 page.5 ZSTD(332)
+75 221 C36048C0 page.5 ZSTD(258)
+76 262 E988C90B page.5 ZSTD(343)
+77 280 6C98308C page.5 ZSTD(332)
+~~~
+
+# CHECKPOINTS
+
+~~~
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.0
+VERSION [ 0001]: 1
+PAGENUM [ 00000000]: 0
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000005]: 5
+MINSEQN [0000000000000001]: 1
+ELEMNTS [ 0000000C]: 12
+CHECKSM [ 4AFA3119]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.1
+VERSION [ 0001]: 1
+PAGENUM [ 00000001]: 1
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [000000000000000D]: 13
+MINSEQN [000000000000000D]: 13
+ELEMNTS [ 0000000C]: 12
+CHECKSM [ 70829F7B]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.2
+VERSION [ 0001]: 1
+PAGENUM [ 00000002]: 2
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000019]: 25
+MINSEQN [0000000000000019]: 25
+ELEMNTS [ 0000000E]: 14
+CHECKSM [ 4ABFB50A]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.3
+VERSION [ 0001]: 1
+PAGENUM [ 00000003]: 3
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000027]: 39
+MINSEQN [0000000000000027]: 39
+ELEMNTS [ 0000000D]: 13
+CHECKSM [ 95B393C6]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.4
+VERSION [ 0001]: 1
+PAGENUM [ 00000004]: 4
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000034]: 52
+MINSEQN [0000000000000034]: 52
+ELEMNTS [ 0000000F]: 15
+CHECKSM [ 9B602904]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.head
+VERSION [ 0001]: 1
+PAGENUM [ 00000005]: 5
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000043]: 67
+MINSEQN [0000000000000043]: 67
+ELEMNTS [ 0000000B]: 11
+CHECKSM [ B5F33B10]
+~~~
\ No newline at end of file
diff --git a/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz
new file mode 100644
index 00000000000..53a0c4f9851
Binary files /dev/null and b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz differ
diff --git a/qa/integration/fixtures/pq_drain_spec.yml b/qa/integration/fixtures/pq_drain_spec.yml
new file mode 100644
index 00000000000..9b135a29e70
--- /dev/null
+++ b/qa/integration/fixtures/pq_drain_spec.yml
@@ -0,0 +1,3 @@
+---
+services:
+ - logstash
\ No newline at end of file
diff --git a/qa/integration/specs/pq_drain_spec.rb b/qa/integration/specs/pq_drain_spec.rb
new file mode 100644
index 00000000000..b524d7547b8
--- /dev/null
+++ b/qa/integration/specs/pq_drain_spec.rb
@@ -0,0 +1,142 @@
+# Licensed to Elasticsearch B.V. under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Elasticsearch B.V. licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require_relative '../framework/fixture'
+require_relative '../framework/settings'
+require_relative '../services/logstash_service'
+require_relative '../framework/helpers'
+require "logstash/devutils/rspec/spec_helper"
+
+require 'stud/temporary'
+
+if ENV['FEATURE_FLAG'] == 'persistent_queues'
+ describe "Test logstash queue draining" do
+ before(:all) { @fixture = Fixture.new(__FILE__) }
+ after(:all) { @fixture&.teardown }
+
+ let(:logstash_service) { @fixture.get_service("logstash") }
+
+ shared_examples 'pq drain' do |queue_compression_setting|
+ let(:settings_flags) { super().merge('queue.drain' => true) }
+
+ around(:each) do |example|
+ Stud::Temporary.directory('data') do |tempdir|
+ # expand the fixture tarball into our temp data dir
+ data_dir_tarball = File.join(__dir__, '../fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz')
+ `tar --directory #{Shellwords.escape(tempdir)} --strip-components 1 -xzf "#{Shellwords.escape(data_dir_tarball)}"`
+
+ @data_dir = tempdir
+ example.call
+ end
+ end
+
+ around(:each) do |example|
+ Stud::Temporary.file('output') do |tempfile|
+ @output_file = tempfile.path
+ example.call
+ end
+ end
+
+ let(:pipeline) do
+ <<~PIPELINE
+ input { generator { count => 1 type => seed } }
+ output { file { path => "#{@output_file}" codec => json_lines } }
+ PIPELINE
+ end
+
+ it "reads the contents of the PQ and drains" do
+
+ unacked_queued_elements = Pathname.new(@data_dir).glob('queue/main/checkpoint*').map { |cpf| decode_checkpoint(cpf) }
+ .map { |cp| (cp.elements - (cp.first_unacked_seq - cp.min_sequence)) }.reduce(&:+)
+
+ invoke_args = %W(
+ --log.level=debug
+ --path.settings=#{File.dirname(logstash_service.application_settings_file)}
+ --path.data=#{@data_dir}
+ --pipeline.workers=2
+ --pipeline.batch.size=8
+ --config.string=#{pipeline}
+ )
+ invoke_args << "-Squeue.compression=#{queue_compression_setting}" unless queue_compression_setting.nil?
+
+ status = logstash_service.run(*invoke_args)
+
+ aggregate_failures('process output') do
+ expect(status.exit_code).to be_zero
+ expect(status.stderr_and_stdout).to include("queue.type: persisted")
+ expect(status.stderr_and_stdout).to include("queue.drain: true")
+ expect(status.stderr_and_stdout).to include("queue.compression: #{queue_compression_setting}") unless queue_compression_setting.nil?
+ end
+
+ aggregate_failures('processing result') do
+ # count the events, make sure they're all the right shape.
+ expect(::File::size(@output_file)).to_not be_zero
+
+ written_events = ::File::read(@output_file).lines.map { |line| JSON.load(line) }
+ expect(written_events.size).to eq(unacked_queued_elements + 1)
+ timestamps = written_events.map {|event| event['@timestamp'] }
+ expect(timestamps.uniq.size).to eq(written_events.size)
+ end
+
+ aggregate_failures('resulting queue state') do
+ # glob the data dir and make sure things have been cleaned up.
+ # we should only have a head checkpoint and a single fully-acked page.
+ checkpoints = Pathname.new(@data_dir).glob('queue/main/checkpoint*')
+ expect(checkpoints.size).to eq(1)
+ expect(checkpoints.first.basename.to_s).to eq('checkpoint.head')
+ checkpoint = decode_checkpoint(checkpoints.first)
+ expect(checkpoint.first_unacked_page).to eq(checkpoint.page_number)
+ expect(checkpoint.first_unacked_seq).to eq(checkpoint.min_sequence + checkpoint.elements)
+
+ pages = Pathname.new(@data_dir).glob('queue/main/page*')
+ expect(pages.size).to eq(1)
+ end
+ end
+ end
+
+ context "`queue.compression` setting" do
+ %w(none speed balanced size).each do |explicit_compression_setting|
+ context "explicit `#{explicit_compression_setting}`" do
+ include_examples 'pq drain', explicit_compression_setting
+ end
+ end
+ context "default setting" do
+ include_examples 'pq drain', nil
+ end
+ end
+ end
+
+ def decode_checkpoint(path)
+ bytes = path.read(encoding: 'BINARY').bytes
+
+ bstoi = -> (bs) { bs.reduce(0) {|m,b| (m<<8)+b } }
+
+ version = bstoi[bytes.slice(0,2)]
+ pagenum = bstoi[bytes.slice(2,4)]
+ first_unacked_page = bstoi[bytes.slice(6,4)]
+ first_unacked_seq = bstoi[bytes.slice(10,8)]
+ min_sequence = bstoi[bytes.slice(18,8)]
+ elements = bstoi[bytes.slice(26,4)]
+
+ OpenStruct.new(version: version,
+ page_number: pagenum,
+ first_unacked_page: first_unacked_page,
+ first_unacked_seq: first_unacked_seq,
+ min_sequence: min_sequence,
+ elements: elements)
+ end
+end
\ No newline at end of file
diff --git a/tools/dependencies-report/src/main/resources/licenseMapping.csv b/tools/dependencies-report/src/main/resources/licenseMapping.csv
index 128eef59960..14fb7c8c28a 100644
--- a/tools/dependencies-report/src/main/resources/licenseMapping.csv
+++ b/tools/dependencies-report/src/main/resources/licenseMapping.csv
@@ -33,6 +33,7 @@ dependency,dependencyUrl,licenseOverride,copyright,sourceURL
"com.fasterxml.jackson.core:jackson-databind:",https://github.com/FasterXML/jackson-databind,Apache-2.0
"com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:",https://github.com/FasterXML/jackson-dataformats-binary,Apache-2.0
"com.fasterxml.jackson.module:jackson-module-afterburner:",https://github.com/FasterXML/jackson-modules-base,Apache-2.0
+"com.github.luben:zstd-jni:1.5.7-4",https://github.com/luben/zstd-jni,BSD-2-Clause
"com.google.googlejavaformat:google-java-format:",https://github.com/google/google-java-format,Apache-2.0
"com.google.guava:guava:",https://github.com/google/guava,Apache-2.0
"com.google.j2objc:j2objc-annotations:",https://github.com/google/j2objc/,Apache-2.0
diff --git a/tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt b/tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt
new file mode 100644
index 00000000000..4accd5fd41e
--- /dev/null
+++ b/tools/dependencies-report/src/main/resources/notices/com.github.luben!zstd-jni-NOTICE.txt
@@ -0,0 +1,26 @@
+source: https://github.com/luben/zstd-jni/blob/v1.5.7-4/LICENSE
+
+Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.
+
+BSD-2-Clause License https://opensource.org/license/bsd-2-clause
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice, this
+ list of conditions and the following disclaimer in the documentation and/or
+ other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file