diff --git a/config/logstash.yml b/config/logstash.yml
index e4d008ca5cd..4a5be0cb23b 100644
--- a/config/logstash.yml
+++ b/config/logstash.yml
@@ -223,6 +223,10 @@
#
# queue.checkpoint.writes: 1024
#
+# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`.
+# The default `none` is able to decompress previously-written events, even if they were compressed.
+#
+# queue.compression: none
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go
index 95fc569b236..d1e976bbab1 100644
--- a/docker/data/logstash/env2yaml/env2yaml.go
+++ b/docker/data/logstash/env2yaml/env2yaml.go
@@ -58,6 +58,7 @@ var validSettings = []string{
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval", // remove it for #17155
+ "queue.compression",
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
diff --git a/docs/reference/logstash-settings-file.md b/docs/reference/logstash-settings-file.md
index 7bcce3c853a..b14ac26f13c 100644
--- a/docs/reference/logstash-settings-file.md
+++ b/docs/reference/logstash-settings-file.md
@@ -68,6 +68,7 @@ The `logstash.yml` file includes these settings.
| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
+| `queue.compression` | Set a persisted queue compression goal, which allows the pipeline to spend CPU to reduce the serialized size on disk. Acceptable values are `speed`, `balanced`, and `size`. | `none` |
| `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` |
| `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` |
| `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` |
diff --git a/docs/reference/persistent-queues.md b/docs/reference/persistent-queues.md
index 5a3bc71b0d6..070df0e4b4f 100644
--- a/docs/reference/persistent-queues.md
+++ b/docs/reference/persistent-queues.md
@@ -81,6 +81,11 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref
To avoid losing data in the persistent queue, you can set `queue.checkpoint.writes: 1` to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to `1` ensures maximum durability, but can severely impact performance. See [Controlling durability](#durability-persistent-queues) to better understand the trade-offs.
+`queue.compression`
+: Sets the event compression goal for use with the persisted queue. Default is `none`. Acceptable values include:
+ * `speed`: optimize for fastest compression operation
+ * `size`: optimize for smallest size on disk, spending more CPU
+ * `balanced`: a balance between the `speed` and `size` settings
## Configuration notes [pq-config-notes]
diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb
index b79bd58dcda..9a6d3dfca97 100644
--- a/logstash-core/lib/logstash/environment.rb
+++ b/logstash-core/lib/logstash/environment.rb
@@ -91,6 +91,7 @@ module Environment
Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # remove it for #17155
Setting::Boolean.new("queue.checkpoint.retry", true),
+ Setting::SettingString.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::SettingNumeric.new("dead_letter_queue.flush_interval", 5000),
diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb
index d7feb04b117..a4d6d1487e4 100644
--- a/logstash-core/lib/logstash/settings.rb
+++ b/logstash-core/lib/logstash/settings.rb
@@ -69,6 +69,7 @@ def self.included(base)
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
+ "queue.compression",
"queue.drain",
"queue.max_bytes",
"queue.max_events",
diff --git a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
index 0b551494698..ccd37f6bb4e 100644
--- a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
+++ b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
@@ -28,8 +28,17 @@
let(:output_strings) { [] }
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }
+ let(:queue_settings) do
+ LogStash::AckedQueue.file_settings_builder(path)
+ .capacity(page_capacity)
+ .checkpointMaxAcks(queue_checkpoint_acks)
+ .checkpointMaxWrites(queue_checkpoint_writes)
+ .queueMaxBytes(queue_capacity)
+ .build
+ end
+
let(:queue) do
- described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity)
+ described_class.new(queue_settings)
end
let(:writer_threads) do
diff --git a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb
index 171525a30a3..1c1f9a1fb2d 100644
--- a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb
+++ b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb
@@ -125,7 +125,18 @@ def threaded_read_client
context "WrappedAckedQueue" do
let(:path) { Stud::Temporary.directory }
- let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) }
+
+ let(:queue_settings) do
+ LogStash::AckedQueue.file_settings_builder(path)
+ .capacity(1024)
+ .maxUnread(10)
+ .checkpointMaxAcks(1024)
+ .checkpointMaxWrites(1024)
+ .queueMaxBytes(4096)
+ .build
+ end
+
+ let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb
index bbc1972d342..9a0b062a4f9 100644
--- a/logstash-core/spec/logstash/queue_factory_spec.rb
+++ b/logstash-core/spec/logstash/queue_factory_spec.rb
@@ -30,6 +30,7 @@
LogStash::Setting::SettingNumeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
+ LogStash::Setting::SettingString.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
diff --git a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
index 5c729196c4e..67093a78eea 100644
--- a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
+++ b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
@@ -53,7 +53,18 @@
let(:checkpoint_acks) { 1024 }
let(:checkpoint_writes) { 1024 }
let(:path) { Stud::Temporary.directory }
- let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) }
+
+ let(:queue_settings) do
+ LogStash::AckedQueue.file_settings_builder(path)
+ .capacity(page_capacity)
+ .maxUnread(max_events)
+ .checkpointMaxAcks(checkpoint_acks)
+ .checkpointMaxWrites(checkpoint_writes)
+ .queueMaxBytes(max_bytes)
+ .build
+ end
+
+ let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
after do
queue.close
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractDeflateAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractDeflateAwareCompressionCodec.java
new file mode 100644
index 00000000000..bed647271c7
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AbstractDeflateAwareCompressionCodec.java
@@ -0,0 +1,120 @@
+package org.logstash.ackedqueue;
+
+import org.logstash.util.SetOnceReference;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * Subclasses of {@link AbstractDeflateAwareCompressionCodec} are {@link CompressionCodec}s that are capable
+ * of detecting and decompressing deflate-compressed events. When decoding byte sequences that are NOT
+ * deflate-compressed, the given bytes are emitted verbatim.
+ */
+abstract class AbstractDeflateAwareCompressionCodec implements CompressionCodec {
+
+ static final int BAOS_SHAREABLE_THRESHOLD_BYTES = 4096;
+
+ private final ThreadLocal bufferedInflaterThreadLocal;
+
+ public AbstractDeflateAwareCompressionCodec() {
+ this.bufferedInflaterThreadLocal = ThreadLocal.withInitial(BufferedInflater::new);
+ }
+
+ @Override
+ public byte[] decode(byte[] data) {
+ if (!isDeflate(data)) {
+ return data;
+ }
+ final BufferedInflater bufferedInflater = bufferedInflaterThreadLocal.get();
+ try {
+ return bufferedInflater.decode(data);
+ } catch (IOException e) {
+ throw new RuntimeException("IOException while decoding", e);
+ }
+ }
+
+ static boolean isDeflate(byte[] data) {
+ if (data.length < 2) { return false; }
+
+ // parse first two bytes as big-endian short
+ short header = (short) (((data[0] & 0xFF) << 8) | (data[1] & 0xFF));
+
+ /*
+ * RFC-1950: ZLIB Compressed Data Format Specification version 3.3
+ * https://www.ietf.org/rfc/rfc1950.txt
+ * ┏━━━━ CMF ━━━━━┳━━━━━━━━━━ FLG ━━━━━━━━━━┓
+ * ┠─CINFO─┬──CM──╂─FLEVEL─┬─FDICT─┬─FCHECK─┨
+ * ┃ 0XXX │ 1000 ┃ XX │ 0 │ XXXXX ┃
+ * ┗━━━━━━━┷━━━━━━┻━━━━━━━━┷━━━━━━━┷━━━━━━━━┛
+ * CINFO: 0XXX // always LTE 7 (7 is 32KB window)
+ * CM: 1000 // always 8 for deflate
+ * DFICT: 0 // always unset (no dictionary)
+ *
+ */// 0XXX_1000_XX_0_XXXXX
+ short mask = (short) 0b1000_1111_00_1_00000; // bits to keep
+ short flip = (short) 0b0000_1000_00_0_00000; // bits to flip
+ short goal = (short) 0b0000_0000_00_0_00000; // goal state
+ if (((header & mask) ^ flip) != goal) {
+ return false;
+ }
+
+ // additionally the FCHECK ensures that
+ // the big-endian header is a multiple of 31
+ return header % 31 == 0;
+ }
+
+ /**
+ * A {@link BufferedInflater} is a convenience wrapper around the complexities
+ * of managing an {@link Inflater}, an intermediate {@code byte[]} buffer, and
+ * a {@link ByteArrayOutputStream}. It enables internal reuse of small buffers
+ * to reduce allocations.
+ */
+ static class BufferedInflater {
+ private final Inflater inflater;
+ private final byte[] intermediateBuffer;
+ private final SetOnceReference reusableBaosRef;
+
+ public BufferedInflater() {
+ this.inflater = new Inflater();
+ this.intermediateBuffer = new byte[1024];
+ this.reusableBaosRef = SetOnceReference.unset();
+ }
+
+ public byte[] decode(final byte[] data) throws IOException {
+ final ByteArrayOutputStream baos = getBaos(data.length);
+ try {
+ inflater.setInput(data);
+
+ do {
+ if (inflater.needsInput()) {
+ throw new IOException(String.format("prematurely reached end of encoded value (%s/%s)", inflater.getBytesRead(), inflater.getTotalIn()));
+ }
+ try {
+ int count = inflater.inflate(intermediateBuffer);
+ baos.write(intermediateBuffer, 0, count);
+ } catch (DataFormatException e) {
+ throw new IOException("Failed to decode", e);
+ }
+ } while (!inflater.finished());
+
+ return baos.toByteArray();
+ } finally {
+ inflater.reset();
+ baos.reset();
+ }
+ }
+
+ public void release() {
+ inflater.end();
+ }
+
+ private ByteArrayOutputStream getBaos(final int encodedSize) {
+ if (encodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) {
+ return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES));
+ }
+ return new ByteArrayOutputStream(encodedSize);
+ }
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java
new file mode 100644
index 00000000000..126d82ccb65
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/CompressionCodec.java
@@ -0,0 +1,60 @@
+package org.logstash.ackedqueue;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.zip.Deflater;
+
+public interface CompressionCodec {
+ Logger LOGGER = LogManager.getLogger(CompressionCodec.class);
+
+ byte[] encode(byte[] data);
+ byte[] decode(byte[] data);
+
+ /**
+ * The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that
+ * does nothing when encoding and decoding. It is only meant to be activated
+ * as a safety-latch in the event of compression being broken.
+ */
+ CompressionCodec NOOP = new CompressionCodec() {
+ @Override
+ public byte[] encode(byte[] data) {
+ return data;
+ }
+
+ @Override
+ public byte[] decode(byte[] data) {
+ return data;
+ }
+ };
+
+ static CompressionCodec fromConfigValue(final String configValue) {
+ return fromConfigValue(configValue, LOGGER);
+ }
+
+ static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
+ return switch (configValue) {
+ case "disabled" -> {
+ logger.warn("compression support has been disabled");
+ yield CompressionCodec.NOOP;
+ }
+ case "none" -> {
+ logger.info("compression support is enabled (read-only)");
+ yield DeflateAwareCompressionCodec.getInstance();
+ }
+ case "speed" -> {
+ logger.info("compression support is enabled (goal: speed)");
+ yield new DeflateEnabledCompressionCodec(Deflater.BEST_SPEED);
+ }
+ case "balanced" -> {
+ logger.info("compression support is enabled (goal: balanced)");
+ yield new DeflateEnabledCompressionCodec(Deflater.DEFAULT_COMPRESSION);
+ }
+ case "size" -> {
+ logger.info("compression support is enabled (goal: size)");
+ yield new DeflateEnabledCompressionCodec(Deflater.BEST_COMPRESSION);
+ }
+ default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
+ };
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/DeflateAwareCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/DeflateAwareCompressionCodec.java
new file mode 100644
index 00000000000..1933b453c02
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/DeflateAwareCompressionCodec.java
@@ -0,0 +1,18 @@
+package org.logstash.ackedqueue;
+
+/**
+ * A {@link DeflateAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed
+ * bytes, but performs no compression when encoding.
+ */
+class DeflateAwareCompressionCodec extends AbstractDeflateAwareCompressionCodec {
+ private static final DeflateAwareCompressionCodec INSTANCE = new DeflateAwareCompressionCodec();
+
+ static DeflateAwareCompressionCodec getInstance() {
+ return INSTANCE;
+ }
+
+ @Override
+ public byte[] encode(byte[] data) {
+ return data;
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/DeflateEnabledCompressionCodec.java b/logstash-core/src/main/java/org/logstash/ackedqueue/DeflateEnabledCompressionCodec.java
new file mode 100644
index 00000000000..bbbe630e181
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/DeflateEnabledCompressionCodec.java
@@ -0,0 +1,80 @@
+package org.logstash.ackedqueue;
+
+import org.logstash.util.SetOnceReference;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HexFormat;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/**
+ * A {@link DeflateEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed
+ * bytes and performs deflate compression when encoding.
+ */
+class DeflateEnabledCompressionCodec extends AbstractDeflateAwareCompressionCodec implements CompressionCodec {
+
+ private final ThreadLocal bufferedDeflaterThreadLocal;
+
+ DeflateEnabledCompressionCodec(final int level) {
+ this.bufferedDeflaterThreadLocal = ThreadLocal.withInitial(() -> new BufferedDeflater(level));
+ }
+
+ @Override
+ public byte[] encode(byte[] data) {
+ final BufferedDeflater bufferedDeflater = bufferedDeflaterThreadLocal.get();
+ try {
+ return bufferedDeflater.encode(data);
+ } catch (IOException e) {
+ throw new RuntimeException("IOException while encoding", e);
+ }
+ }
+
+ /**
+ * A {@link BufferedDeflater} is a convenience wrapper around the complexities
+ * of managing an {@link Inflater}, an intermediate {@code byte[]} buffer, and
+ * a {@link ByteArrayOutputStream}. It enables internal reuse of small buffers
+ * to reduce allocations.
+ */
+ static class BufferedDeflater {
+ private final Deflater deflater;
+ private final byte[] intermediateBuffer;
+ private final SetOnceReference reusableBaosRef;
+
+ public BufferedDeflater(final int level) {
+ this.deflater = new Deflater(level);
+ this.intermediateBuffer = new byte[1024];
+ this.reusableBaosRef = SetOnceReference.unset();
+ }
+
+ public byte[] encode(final byte[] data) throws IOException {
+ final ByteArrayOutputStream baos = getBaos(data.length);
+ try {
+ deflater.setInput(data);
+ deflater.finish();
+
+ while (!deflater.finished()) {
+ int count = deflater.deflate(intermediateBuffer);
+ baos.write(intermediateBuffer, 0, count);
+ }
+ byte[] encodedBytes = baos.toByteArray();
+ assert isDeflate(encodedBytes) : String.format("invalid deflate signature `%s`", HexFormat.of().formatHex(encodedBytes,0,2));
+ return encodedBytes;
+ } finally {
+ deflater.reset();
+ baos.reset();
+ }
+ }
+
+ public void release() {
+ deflater.end();
+ }
+
+ private ByteArrayOutputStream getBaos(final int decodedSize) {
+ if (decodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) {
+ return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES));
+ }
+ return new ByteArrayOutputStream(decodedSize);
+ }
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
index 691987793c5..de76e412b4f 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
@@ -83,6 +83,7 @@ public final class Queue implements Closeable {
// deserialization
private final Class extends Queueable> elementClass;
private final Method deserializeMethod;
+ private final CompressionCodec compressionCodec;
// thread safety
private final ReentrantLock lock = new ReentrantLock();
@@ -112,6 +113,7 @@ public Queue(Settings settings) {
this.maxBytes = settings.getQueueMaxBytes();
this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry());
this.elementClass = settings.getElementClass();
+ this.compressionCodec = settings.getCompressionCodec();
this.tailPages = new ArrayList<>();
this.unreadTailPages = new ArrayList<>();
this.closed = new AtomicBoolean(true); // not yet opened
@@ -414,7 +416,10 @@ public long write(Queueable element) throws IOException {
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE);
}
- byte[] data = element.serialize();
+ byte[] serializedBytes = element.serialize();
+ byte[] data = compressionCodec.encode(serializedBytes);
+
+ logger.trace("serialized: {}->{}", serializedBytes.length, data.length);
// the write strategy with regard to the isFull() state is to assume there is space for this element
// and write it, then after write verify if we just filled the queue and wait on the notFull condition
@@ -767,7 +772,9 @@ public CheckpointIO getCheckpointIO() {
*/
public Queueable deserialize(byte[] bytes) {
try {
- return (Queueable)this.deserializeMethod.invoke(this.elementClass, bytes);
+ byte[] decodedBytes = compressionCodec.decode(bytes);
+ logger.trace("deserialized: {}->{}", bytes.length, decodedBytes.length);
+ return (Queueable)this.deserializeMethod.invoke(this.elementClass, decodedBytes);
} catch (IllegalAccessException|InvocationTargetException e) {
throw new QueueRuntimeException("deserialize invocation error", e);
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
index bcc9b139e58..f437c1898ba 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
@@ -24,6 +24,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
@@ -31,12 +34,15 @@
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.SettingKeyDefinitions;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
+import static org.logstash.common.SettingKeyDefinitions.*;
+
/**
* Persistent queue factory JRuby extension.
* */
@@ -60,6 +66,8 @@ public final class QueueFactoryExt extends RubyBasicObject {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger(QueueFactoryExt.class);
+
public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@@ -69,10 +77,8 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString();
if (PERSISTED_TYPE.equals(type)) {
- final Path queuePath = Paths.get(
- getSetting(context, settings, SettingKeyDefinitions.PATH_QUEUE).asJavaString(),
- getSetting(context, settings, SettingKeyDefinitions.PIPELINE_ID).asJavaString()
- );
+ final Settings queueSettings = extractQueueSettings(settings);
+ final Path queuePath = Paths.get(queueSettings.getDirPath());
// Files.createDirectories raises a FileAlreadyExistsException
// if pipeline queue path is a symlink, so worth checking against Files.exists
@@ -80,18 +86,7 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
Files.createDirectories(queuePath);
}
- return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS)
- .initialize(
- context, new IRubyObject[]{
- context.runtime.newString(queuePath.toString()),
- getSetting(context, settings, SettingKeyDefinitions.QUEUE_PAGE_CAPACITY),
- getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS),
- getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES),
- getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS),
- getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY),
- getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES)
- }
- );
+ return JRubyWrappedAckedQueueExt.create(context, queueSettings);
} else if (MEMORY_TYPE.equals(type)) {
return new JrubyWrappedSynchronousQueueExt(
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
@@ -118,4 +113,28 @@ private static IRubyObject getSetting(final ThreadContext context, final IRubyOb
final String name) {
return settings.callMethod(context, "get_value", context.runtime.newString(name));
}
+
+ private static Settings extractQueueSettings(final IRubyObject settings) {
+ final ThreadContext context = settings.getRuntime().getCurrentContext();
+ final Path queuePath = Paths.get(
+ getSetting(context, settings, PATH_QUEUE).asJavaString(),
+ getSetting(context, settings, PIPELINE_ID).asJavaString()
+ );
+ return SettingsImpl.fileSettingsBuilder(queuePath.toString())
+ .elementClass(Event.class)
+ .capacity(getSetting(context, settings, QUEUE_PAGE_CAPACITY).toJava(Integer.class))
+ .maxUnread(getSetting(context, settings, QUEUE_MAX_EVENTS).toJava(Integer.class))
+ .checkpointMaxWrites(getSetting(context, settings, QUEUE_CHECKPOINT_WRITES).toJava(Integer.class))
+ .checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
+ .checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
+ .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
+ .compressionCodec(extractConfiguredCodec(settings))
+ .build();
+ }
+
+ private static CompressionCodec extractConfiguredCodec(final IRubyObject settings) {
+ final ThreadContext context = settings.getRuntime().getCurrentContext();
+ final String compressionSetting = getSetting(context, settings, QUEUE_COMPRESSION).asJavaString();
+ return CompressionCodec.fromConfigValue(compressionSetting, LOGGER);
+ }
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
index 45b34fe0fd0..9c404ef1fc5 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java
@@ -20,6 +20,9 @@
package org.logstash.ackedqueue;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Persistent queue settings definition.
* */
@@ -41,6 +44,34 @@ public interface Settings {
boolean getCheckpointRetry();
+ CompressionCodec getCompressionCodec();
+
+ /**
+ * Validate and return the settings, or throw descriptive {@link QueueRuntimeException}
+ * @param settings the settings to validate
+ * @return the settings that were provided
+ */
+ static Settings ensureValid(final Settings settings) {
+ final List errors = new ArrayList<>();
+
+ if (settings == null) {
+ errors.add("settings cannot be null");
+ } else {
+ if (settings.getDirPath() == null) {
+ errors.add("dirPath cannot be null");
+ }
+ if (settings.getElementClass() == null) {
+ errors.add("elementClass cannot be null");
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ throw new QueueRuntimeException(String.format("Invalid Queue Settings: %s", errors));
+ }
+
+ return settings;
+ }
+
/**
* Persistent queue Setting's fluent builder definition
* */
@@ -60,6 +91,8 @@ interface Builder {
Builder checkpointRetry(boolean checkpointRetry);
+ Builder compressionCodec(CompressionCodec compressionCodec);
+
Settings build();
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
index dd23001886f..d55e3698958 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java
@@ -20,6 +20,8 @@
package org.logstash.ackedqueue;
+import java.util.function.Consumer;
+
/**
* Persistent queue settings implementation.
* */
@@ -32,29 +34,26 @@ public class SettingsImpl implements Settings {
private int checkpointMaxAcks;
private int checkpointMaxWrites;
private boolean checkpointRetry;
+ private final CompressionCodec compressionCodec;
public static Builder builder(final Settings settings) {
- return new BuilderImpl(settings.getDirPath(), settings.getElementClass(), settings.getCapacity(),
- settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
- settings.getCheckpointMaxWrites(), settings.getCheckpointRetry()
- );
+ return new BuilderImpl(settings);
}
public static Builder fileSettingsBuilder(final String dirForFiles) {
return new BuilderImpl(dirForFiles);
}
- private SettingsImpl(final String dirForFiles, final Class extends Queueable> elementClass,
- final int capacity, final long queueMaxBytes, final int maxUnread,
- final int checkpointMaxAcks, final int checkpointMaxWrites, boolean checkpointRetry) {
- this.dirForFiles = dirForFiles;
- this.elementClass = elementClass;
- this.capacity = capacity;
- this.queueMaxBytes = queueMaxBytes;
- this.maxUnread = maxUnread;
- this.checkpointMaxAcks = checkpointMaxAcks;
- this.checkpointMaxWrites = checkpointMaxWrites;
- this.checkpointRetry = checkpointRetry;
+ private SettingsImpl(final BuilderImpl builder) {
+ this.dirForFiles = builder.dirForFiles;
+ this.elementClass = builder.elementClass;
+ this.capacity = builder.capacity;
+ this.queueMaxBytes = builder.queueMaxBytes;
+ this.maxUnread = builder.maxUnread;
+ this.checkpointMaxAcks = builder.checkpointMaxAcks;
+ this.checkpointMaxWrites = builder.checkpointMaxWrites;
+ this.checkpointRetry = builder.checkpointRetry;
+ this.compressionCodec = builder.compressionCodec;
}
@Override
@@ -97,6 +96,11 @@ public boolean getCheckpointRetry() {
return this.checkpointRetry;
}
+ @Override
+ public CompressionCodec getCompressionCodec() {
+ return this.compressionCodec;
+ }
+
/**
* Default implementation for Setting's Builder
* */
@@ -146,89 +150,113 @@ private static final class BuilderImpl implements Builder {
private final boolean checkpointRetry;
+ private final CompressionCodec compressionCodec;
+
private BuilderImpl(final String dirForFiles) {
- this(dirForFiles, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES,
- DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES, false
- );
+ this.dirForFiles = dirForFiles;
+ this.elementClass = null;
+ this.capacity = DEFAULT_CAPACITY;
+ this.queueMaxBytes = DEFAULT_MAX_QUEUE_BYTES;
+ this.maxUnread = DEFAULT_MAX_UNREAD;
+ this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS;
+ this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES;
+ this.compressionCodec = CompressionCodec.NOOP;
+ this.checkpointRetry = false;
}
- private BuilderImpl(final String dirForFiles, final Class extends Queueable> elementClass,
- final int capacity, final long queueMaxBytes, final int maxUnread,
- final int checkpointMaxAcks, final int checkpointMaxWrites, final boolean checkpointRetry) {
- this.dirForFiles = dirForFiles;
- this.elementClass = elementClass;
- this.capacity = capacity;
- this.queueMaxBytes = queueMaxBytes;
- this.maxUnread = maxUnread;
- this.checkpointMaxAcks = checkpointMaxAcks;
- this.checkpointMaxWrites = checkpointMaxWrites;
- this.checkpointRetry = checkpointRetry;
+ private BuilderImpl(final Settings settings) {
+ this.dirForFiles = settings.getDirPath();
+ this.elementClass = settings.getElementClass();
+ this.capacity = settings.getCapacity();
+ this.queueMaxBytes = settings.getQueueMaxBytes();
+ this.maxUnread = settings.getMaxUnread();
+ this.checkpointMaxAcks = settings.getCheckpointMaxAcks();
+ this.checkpointMaxWrites = settings.getCheckpointMaxWrites();
+ this.checkpointRetry = settings.getCheckpointRetry();
+ this.compressionCodec = settings.getCompressionCodec();
+ }
+
+ private BuilderImpl(final MutableBuilder mutableBuilder) {
+ this.dirForFiles = mutableBuilder.dirForFiles;
+ this.elementClass = mutableBuilder.elementClass;
+ this.capacity = mutableBuilder.capacity;
+ this.queueMaxBytes = mutableBuilder.queueMaxBytes;
+ this.maxUnread = mutableBuilder.maxUnread;
+ this.checkpointMaxAcks = mutableBuilder.checkpointMaxAcks;
+ this.checkpointMaxWrites = mutableBuilder.checkpointMaxWrites;
+ this.checkpointRetry = mutableBuilder.checkpointRetry;
+ this.compressionCodec = mutableBuilder.compressionCodec;
}
@Override
public Builder elementClass(final Class extends Queueable> elementClass) {
- return new BuilderImpl(
- this.dirForFiles, elementClass, this.capacity, this.queueMaxBytes, this.maxUnread,
- this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
- );
+ return mutate(mutable -> mutable.elementClass = elementClass);
}
@Override
public Builder capacity(final int capacity) {
- return new BuilderImpl(
- this.dirForFiles, this.elementClass, capacity, this.queueMaxBytes, this.maxUnread,
- this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
- );
+ return mutate(mutable -> mutable.capacity = capacity);
}
@Override
public Builder queueMaxBytes(final long size) {
- return new BuilderImpl(
- this.dirForFiles, this.elementClass, this.capacity, size, this.maxUnread,
- this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
- );
+ return mutate(mutable -> mutable.queueMaxBytes = size);
}
@Override
public Builder maxUnread(final int maxUnread) {
- return new BuilderImpl(
- this.dirForFiles, this.elementClass,
- this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks,
- this.checkpointMaxWrites, this.checkpointRetry
- );
+ return mutate(mutable -> mutable.maxUnread = maxUnread);
}
@Override
public Builder checkpointMaxAcks(final int checkpointMaxAcks) {
- return new BuilderImpl(
- this.dirForFiles, this.elementClass,
- this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks,
- this.checkpointMaxWrites, this.checkpointRetry
- );
+ return mutate(mutable -> mutable.checkpointMaxAcks = checkpointMaxAcks);
}
@Override
public Builder checkpointMaxWrites(final int checkpointMaxWrites) {
- return new BuilderImpl(
- this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
- this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, this.checkpointRetry
- );
+ return mutate(mutable -> mutable.checkpointMaxWrites = checkpointMaxWrites);
}
@Override
public Builder checkpointRetry(final boolean checkpointRetry) {
- return new BuilderImpl(
- this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
- this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, checkpointRetry
- );
+ return mutate(mutable -> mutable.checkpointRetry = checkpointRetry);
+ }
+
+ @Override
+ public Builder compressionCodec(CompressionCodec compressionCodec) {
+ return mutate(mutable -> mutable.compressionCodec = compressionCodec);
}
@Override
public Settings build() {
- return new SettingsImpl(
- this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes,
- this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry
- );
+ return Settings.ensureValid(new SettingsImpl(this));
+ }
+
+ private Builder mutate(final Consumer mutator) {
+ final MutableBuilder mutableBuilder = new MutableBuilder();
+ mutator.accept(mutableBuilder);
+ return mutableBuilder.toBuilder();
+ }
+
+ /**
+ * A {@link MutableBuilder} is an internal detail of the immutable {@link BuilderImpl}
+ * that allows its private {@link BuilderImpl#mutate} to work with a temporary mutable copy.
+ */
+ private class MutableBuilder {
+ protected String dirForFiles = BuilderImpl.this.dirForFiles;
+ protected Class extends Queueable> elementClass = BuilderImpl.this.elementClass;
+ protected int capacity = BuilderImpl.this.capacity;
+ protected long queueMaxBytes = BuilderImpl.this.queueMaxBytes;
+ protected int maxUnread = BuilderImpl.this.maxUnread;
+ protected int checkpointMaxAcks = BuilderImpl.this.checkpointMaxAcks;
+ protected int checkpointMaxWrites = BuilderImpl.this.checkpointMaxWrites;
+ protected boolean checkpointRetry = BuilderImpl.this.checkpointRetry;
+ protected CompressionCodec compressionCodec = BuilderImpl.this.compressionCodec;
+
+ Builder toBuilder() {
+ return new BuilderImpl(this);
+ }
}
}
}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java
index c65cb7c2df3..35cb765dbc9 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java
@@ -31,6 +31,7 @@
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
+import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event;
@@ -39,6 +40,7 @@
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.QueueExceptionMessages;
+import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.SettingsImpl;
/**
@@ -60,27 +62,27 @@ public Queue getQueue() {
return this.queue;
}
- public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents, int checkpointMaxWrites,
- int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) {
+ public static JRubyAckedQueueExt create(final Settings settings) {
JRubyAckedQueueExt queueExt = new JRubyAckedQueueExt(RubyUtil.RUBY, RubyUtil.ACKED_QUEUE_CLASS);
- queueExt.initializeQueue(path, capacity, maxEvents, checkpointMaxWrites, checkpointMaxAcks, checkpointRetry,
- maxBytes);
+ queueExt.queue = new Queue(settings);
return queueExt;
}
- private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites,
- int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) {
- this.queue = new Queue(
- SettingsImpl.fileSettingsBuilder(path)
- .capacity(capacity)
- .maxUnread(maxEvents)
- .queueMaxBytes(maxBytes)
- .checkpointMaxAcks(checkpointMaxAcks)
- .checkpointMaxWrites(checkpointMaxWrites)
- .checkpointRetry(checkpointRetry)
- .elementClass(Event.class)
- .build()
- );
+ /**
+ * Helper method for retrieving a ruby-usable {@link Settings.Builder} with the provided path as its directory,
+ * using {@link Event} as its element-type.
+ *
+ * @param context the ruby thread context
+ * @param recv noop receiver (will be rubified LogStash::AckedQueue class)
+ * @param path the path to the queue
+ * @return a ruby-usable proxy for {@link Settings.Builder}
+ */
+ @JRubyMethod(meta = true, name = "file_settings_builder")
+ public static IRubyObject fileSettingsBuilder(final ThreadContext context, IRubyObject recv, final RubyString path) {
+ final Settings.Builder settingsBuilder = SettingsImpl
+ .fileSettingsBuilder(path.asJavaString())
+ .elementClass(Event.class);
+ return JavaUtil.convertJavaToRuby(context.runtime, settingsBuilder);
}
@JRubyMethod(name = "max_unread_events")
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
index 257ce824214..15b95ec8dea 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
@@ -25,13 +25,13 @@
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
-import org.jruby.RubyFixnum;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
-import org.jruby.runtime.Arity;
+import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.Settings;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
@@ -49,23 +49,34 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
private JRubyAckedQueueExt queue;
- @JRubyMethod(optional = 8)
- public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
- args = Arity.scanArgs(context.runtime, args, 7, 0);
- int capacity = RubyFixnum.num2int(args[1]);
- int maxEvents = RubyFixnum.num2int(args[2]);
- int checkpointMaxWrites = RubyFixnum.num2int(args[3]);
- int checkpointMaxAcks = RubyFixnum.num2int(args[4]);
- boolean checkpointRetry = !((RubyBoolean) args[5]).isFalse();
- long queueMaxBytes = RubyFixnum.num2long(args[6]);
-
- this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents,
- checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, queueMaxBytes);
+ @JRubyMethod(required=1)
+ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject settings) throws IOException {
+ if (JavaUtil.isJavaObject(settings)) {
+ this.queue = JRubyAckedQueueExt.create(JavaUtil.unwrapJavaObject(settings));
+ } else {
+ // We should never get here, but previously had an initialize method
+ // that took 7 technically-optional ordered parameters.
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to instantiate JRubyWrappedAckedQueueExt with <%s:%s>",
+ settings.getClass().getName(),
+ settings));
+ }
this.queue.open();
return this;
}
+ public static JRubyWrappedAckedQueueExt create(ThreadContext context, Settings settings) throws IOException {
+ return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS, settings);
+ }
+
+ public JRubyWrappedAckedQueueExt(Ruby runtime, RubyClass metaClass, Settings settings) throws IOException {
+ super(runtime, metaClass);
+ this.queue = JRubyAckedQueueExt.create(settings);
+ this.queue.open();
+ }
+
public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
diff --git a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
index 7c5c47e3162..be113bd0a2f 100644
--- a/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
+++ b/logstash-core/src/main/java/org/logstash/common/SettingKeyDefinitions.java
@@ -43,4 +43,6 @@ public class SettingKeyDefinitions {
public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry";
public static final String QUEUE_MAX_BYTES = "queue.max_bytes";
+
+ public static final String QUEUE_COMPRESSION = "queue.compression";
}
\ No newline at end of file
diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java
new file mode 100644
index 00000000000..d0bc0fc2a57
--- /dev/null
+++ b/logstash-core/src/test/java/org/logstash/ackedqueue/CompressionCodecTest.java
@@ -0,0 +1,255 @@
+package org.logstash.ackedqueue;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.zip.Deflater;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Matchers.argThat;
+
+public class CompressionCodecTest {
+ static final ImmutableByteArrayBarrier RAW_BYTES = new ImmutableByteArrayBarrier((
+ "this is a string of text with repeated substrings that is designed to be "+
+ "able to be compressed into a string that is smaller than the original input "+
+ "so that we can assert that the compression codecs compress strings to be "+
+ "smaller than their uncompressed representations").getBytes());
+ static final ImmutableByteArrayBarrier DEFLATE_SPEED_BYTES = new ImmutableByteArrayBarrier(deflate(RAW_BYTES.bytes(), Deflater.BEST_SPEED));
+ static final ImmutableByteArrayBarrier DEFLATE_BALANCED_BYTES = new ImmutableByteArrayBarrier(deflate(RAW_BYTES.bytes(), Deflater.DEFAULT_COMPRESSION));
+ static final ImmutableByteArrayBarrier DEFLATE_SIZE_BYTES = new ImmutableByteArrayBarrier(deflate(RAW_BYTES.bytes(), Deflater.BEST_COMPRESSION));
+
+ private final CompressionCodec codecDisabled = CompressionCodec.fromConfigValue("disabled");
+ private final CompressionCodec codecNone = CompressionCodec.fromConfigValue("none");
+ private final CompressionCodec codecSpeed = CompressionCodec.fromConfigValue("speed");
+ private final CompressionCodec codecBalanced = CompressionCodec.fromConfigValue("balanced");
+ private final CompressionCodec codecSize = CompressionCodec.fromConfigValue("size");
+
+ @Test
+ public void testDisabledCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled");
+ assertDecodesRaw(compressionCodec);
+
+ // ensure true pass-through when compression is disabled, even if the payload looks like DEFLATE
+ assertThat(compressionCodec.decode(DEFLATE_SPEED_BYTES.bytes()), is(equalTo(DEFLATE_SPEED_BYTES.bytes())));
+ assertThat(compressionCodec.decode(DEFLATE_BALANCED_BYTES.bytes()), is(equalTo(DEFLATE_BALANCED_BYTES.bytes())));
+ assertThat(compressionCodec.decode(DEFLATE_SIZE_BYTES.bytes()), is(equalTo(DEFLATE_SIZE_BYTES.bytes())));
+ }
+
+ @Test
+ public void testDisabledCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("disabled");
+ // ensure true pass-through when compression is disabled
+ assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ @Test
+ public void testDisabledCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("disabled", mockLogger);
+ Mockito.verify(mockLogger).warn(argThat(stringContainsInOrder("compression support", "disabled")));
+ }
+
+ @Test
+ public void testNoneCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testNoneCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("none");
+ assertThat(compressionCodec.encode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ @Test
+ public void testNoneCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("none", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "read-only")));
+ }
+
+ @Test
+ public void testSpeedCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testSpeedCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("speed");
+ assertEncodesSmallerRoundTrip(compressionCodec);
+ }
+
+ @Test
+ public void testSpeedCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("speed", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "speed")));
+ }
+
+ @Test
+ public void testBalancedCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testBalancedCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("balanced");
+ assertEncodesSmallerRoundTrip(compressionCodec);
+ }
+
+ @Test
+ public void testBalancedCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("balanced", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "balanced")));
+ }
+
+ @Test
+ public void testSizeCompressionCodecDecodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size");
+ assertDecodesRaw(compressionCodec);
+ assertDecodesDeflateAnyLevel(compressionCodec);
+ assertDecodesOutputOfAllKnownCompressionCodecs(compressionCodec);
+ }
+
+ @Test
+ public void testSizeCompressionCodecEncodes() throws Exception {
+ final CompressionCodec compressionCodec = CompressionCodec.fromConfigValue("size");
+ assertEncodesSmallerRoundTrip(compressionCodec);
+ }
+
+ @Test
+ public void testSizeCompressionCodecLogging() throws Exception {
+ final Logger mockLogger = Mockito.mock(Logger.class);
+ CompressionCodec.fromConfigValue("size", mockLogger);
+ Mockito.verify(mockLogger).info(argThat(stringContainsInOrder("compression support", "enabled", "size")));
+ }
+
+ @Test(timeout=1000)
+ public void testCompressionCodecDecodeTailTruncated() throws Exception {
+ final byte[] truncatedInput = copyWithTruncatedTail(DEFLATE_BALANCED_BYTES.bytes(), 32);
+
+ final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(truncatedInput));
+ assertThat(thrownException.getMessage(), containsString("IOException while decoding"));
+ final Throwable rootCause = extractRootCause(thrownException);
+ assertThat(rootCause.getMessage(), anyOf(containsString("prematurely reached end"), containsString("incorrect data check")));
+ }
+
+ byte[] copyWithTruncatedTail(final byte[] input, final int tailSize) {
+ int startIndex = (input.length < tailSize) ? 0 : input.length - tailSize;
+
+ final byte[] result = Arrays.copyOf(input, input.length);
+ Arrays.fill(result, startIndex, result.length, (byte) 0);
+
+ return result;
+ }
+
+ @Test(timeout=1000)
+ public void testCompressionCodecDecodeTailScrambled() throws Exception {
+ final byte[] scrambledInput = copyWithScrambledTail(DEFLATE_BALANCED_BYTES.bytes(), 32);
+
+ final RuntimeException thrownException = assertThrows(RuntimeException.class, () -> codecNone.decode(scrambledInput));
+ assertThat(thrownException.getMessage(), containsString("IOException while decoding"));
+ final Throwable rootCause = extractRootCause(thrownException);
+ assertThat(rootCause.getMessage(), anyOf(containsString("prematurely reached end"), containsString("incorrect data check")));
+ }
+
+ byte[] copyWithScrambledTail(final byte[] input, final int tailSize) {
+ final SecureRandom secureRandom = new SecureRandom();
+ int startIndex = (input.length < tailSize) ? 0 : input.length - tailSize;
+
+ byte[] randomBytes = new byte[input.length - startIndex];
+ secureRandom.nextBytes(randomBytes);
+
+ final byte[] result = Arrays.copyOf(input, input.length);
+ System.arraycopy(randomBytes, 0, result, startIndex, randomBytes.length);
+
+ return result;
+ }
+
+ @Test(timeout=1000)
+ public void testCompressionDecodeTailNullPadded() throws Exception {
+ final byte[] nullPaddedInput = copyWithNullPaddedTail(DEFLATE_BALANCED_BYTES.bytes(), 32);
+
+ assertThat(codecNone.decode(nullPaddedInput), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ byte[] copyWithNullPaddedTail(final byte[] input, final int tailSize) {
+ return Arrays.copyOf(input, Math.addExact(input.length, tailSize));
+ }
+
+ Throwable extractRootCause(final Throwable throwable) {
+ Throwable current;
+ Throwable cause = throwable;
+ do {
+ current = cause;
+ cause = current.getCause();
+ } while (cause != null && cause != current);
+ return current;
+ }
+
+ void assertDecodesRaw(final CompressionCodec codec) {
+ assertThat(codec.decode(RAW_BYTES.bytes()), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ void assertDecodesDeflateAnyLevel(final CompressionCodec codec) {
+ final Set levels = Set.of(
+ Deflater.DEFAULT_COMPRESSION,
+ Deflater.NO_COMPRESSION,
+ Deflater.BEST_SPEED,
+ 2, 3, 4, 5, 6, 7, 8,
+ Deflater.BEST_COMPRESSION);
+
+ for (int level : levels) {
+ final byte[] deflated = deflate(RAW_BYTES.bytes(), level);
+ assertThat(String.format("deflate level %s (%s bytes)", level, deflated.length), codec.decode(deflated), is(equalTo(RAW_BYTES.bytes())));
+ }
+ }
+
+ void assertDecodesOutputOfAllKnownCompressionCodecs(final CompressionCodec codec) {
+ assertThat(codec.decode(codecDisabled.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecNone.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecSpeed.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecBalanced.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ assertThat(codec.decode(codecSize.encode(RAW_BYTES.bytes())), is(equalTo(RAW_BYTES.bytes())));
+ }
+
+ void assertEncodesSmallerRoundTrip(final CompressionCodec codec) {
+ final byte[] input = RAW_BYTES.bytes();
+
+ final byte[] encoded = codec.encode(input);
+ assertThat("encoded is smaller", encoded.length, is(lessThan(input.length)));
+ assertThat("shaped like deflate", AbstractDeflateAwareCompressionCodec.isDeflate(encoded), is(true));
+ assertThat("round trip decode", codec.decode(encoded), is(equalTo(input)));
+ }
+
+ public static byte[] deflate(byte[] input, int level) {
+ final Deflater deflater = new Deflater(level);
+ try {
+ deflater.setInput(input);
+ deflater.finish();
+
+ // output SHOULD be smaller, but will never be 1kb bigger
+ byte[] output = new byte[input.length+1024];
+
+ int compressedLength = deflater.deflate(output);
+ return Arrays.copyOf(output, compressedLength);
+ } finally {
+ deflater.end();
+ }
+ }
+}
\ No newline at end of file
diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java b/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java
new file mode 100644
index 00000000000..276e4d97dba
--- /dev/null
+++ b/logstash-core/src/test/java/org/logstash/ackedqueue/ImmutableByteArrayBarrier.java
@@ -0,0 +1,20 @@
+package org.logstash.ackedqueue;
+
+import java.util.Arrays;
+
+/**
+ * An {@link ImmutableByteArrayBarrier} provides an immutability shield around a {@code byte[]}.
+ * It stores an inaccessible copy of the provided {@code byte[]}, and makes copies of that copy
+ * available via {@link ImmutableByteArrayBarrier#bytes}.
+ * @param bytes the byte array
+ */
+public record ImmutableByteArrayBarrier(byte[] bytes) {
+ public ImmutableByteArrayBarrier(byte[] bytes) {
+ this.bytes = Arrays.copyOf(bytes, bytes.length);
+ }
+
+ @Override
+ public byte[] bytes() {
+ return Arrays.copyOf(bytes, bytes.length);
+ }
+}
diff --git a/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md
new file mode 100644
index 00000000000..76be40c5e4c
--- /dev/null
+++ b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.md
@@ -0,0 +1,134 @@
+# Summary
+
+The logstash data directory contains a queue containing:
+
+ - ACK'd events (from a page that is not fully-ack'd)
+ - raw CBOR-encoded events
+ - deflate-compressed events with different compression ratios
+
+# Pages
+~~~
+1 386 09303C2B page.0 CBOR(known)
+2 386 60213EC4 page.0 CBOR(known)
+3 386 2B5A03B4 page.0 CBOR(known)
+4 386 0C2A1D63 page.0 CBOR(known)
+5 386 98BF5320 page.0 CBOR(known)
+6 386 12961E51 page.0 CBOR(known)
+7 386 B2B31EB8 page.0 CBOR(known)
+8 386 6C52C3A1 page.0 CBOR(known)
+9 386 A6F18E72 page.0 CBOR(known)
+10 386 6AF34E8E page.0 CBOR(known)
+11 386 C8FD6460 page.1 CBOR(known)
+12 386 17DBA0EC page.1 CBOR(known)
+13 386 E01C3729 page.1 CBOR(known)
+14 386 23356C6B page.1 CBOR(known)
+15 386 285EC9F2 page.1 CBOR(known)
+16 217 3B5E6A30 page.1 DEFLATE(fastest)
+17 219 0BFB8BCF page.1 DEFLATE(fastest)
+18 220 559B446B page.1 DEFLATE(fastest)
+19 219 78FD2D81 page.1 DEFLATE(fastest)
+20 219 B05865D0 page.1 DEFLATE(fastest)
+21 221 8A0C1AE5 page.1 DEFLATE(fastest)
+22 219 90C5CF52 page.1 DEFLATE(fastest)
+23 220 49A5C28A page.1 DEFLATE(fastest)
+24 218 64D07E10 page.2 DEFLATE(fastest)
+25 219 B94B9BA9 page.2 DEFLATE(fastest)
+26 220 FE4A8839 page.2 DEFLATE(fastest)
+27 219 D14C97AC page.2 DEFLATE(fastest)
+28 220 50E7C8DB page.2 DEFLATE(fastest)
+29 220 E92E09D6 page.2 DEFLATE(fastest)
+30 219 8EFDC43D page.2 DEFLATE(fastest)
+31 386 9F8669A9 page.2 CBOR(known)
+32 386 EAB6DC68 page.2 CBOR(known)
+33 386 77BFC64A page.2 CBOR(known)
+34 386 C6DFF1C6 page.2 CBOR(known)
+35 386 03348319 page.2 CBOR(known)
+36 386 3C3AB761 page.2 CBOR(known)
+37 386 AFAE06D9 page.3 CBOR(known)
+38 386 DC9922A6 page.3 CBOR(known)
+39 386 0782934F page.3 CBOR(known)
+40 386 4CAB5FB2 page.3 CBOR(known)
+41 386 EC861477 page.3 CBOR(known)
+42 386 F63FC4D4 page.3 CBOR(known)
+43 386 0652619E page.3 CBOR(known)
+44 386 40544039 page.3 CBOR(known)
+45 386 F90E7EB6 page.3 CBOR(known)
+46 211 F72C0744 page.3 DEFLATE(default)
+47 216 94DC834D page.3 DEFLATE(default)
+48 217 BFB763CE page.4 DEFLATE(default)
+49 215 2345D002 page.4 DEFLATE(default)
+50 216 771433DE page.4 DEFLATE(default)
+51 216 404A6D24 page.4 DEFLATE(default)
+52 217 2FA23916 page.4 DEFLATE(default)
+53 215 B8799605 page.4 DEFLATE(default)
+54 217 A05795C9 page.4 DEFLATE(default)
+55 215 E5927940 page.4 DEFLATE(default)
+56 386 903804C8 page.4 CBOR(known)
+57 386 5C51DD15 page.4 CBOR(known)
+58 386 70ECFC95 page.4 CBOR(known)
+59 386 6AA9AEFE page.4 CBOR(known)
+60 386 141DE14B page.4 CBOR(known)
+61 386 8F22229A page.5 CBOR(known)
+62 386 CAC721B1 page.5 CBOR(known)
+63 386 0AED21EB page.5 CBOR(known)
+64 386 6169B815 page.5 CBOR(known)
+65 386 0E702D5E page.5 CBOR(known)
+66 214 5A2D04E9 page.5 DEFLATE(maximum)
+67 386 FB39B2CF page.5 CBOR(known)
+68 214 BF504C7C page.5 DEFLATE(maximum)
+69 386 DF5B98B6 page.5 CBOR(known)
+70 212 579EBE80 page.5 DEFLATE(fastest)
+~~~
+
+# CHECKPOINTS
+
+~~~
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.0
+VERSION [ 0001]: 1
+PAGENUM [ 00000000]: 0
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000007]: 7
+MINSEQN [0000000000000001]: 1
+ELEMNTS [ 0000000A]: 10
+CHECKSM [ C3C52167]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.1
+VERSION [ 0001]: 1
+PAGENUM [ 00000001]: 1
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [000000000000000B]: 11
+MINSEQN [000000000000000B]: 11
+ELEMNTS [ 0000000D]: 13
+CHECKSM [ 28208590]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.2
+VERSION [ 0001]: 1
+PAGENUM [ 00000002]: 2
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000018]: 24
+MINSEQN [0000000000000018]: 24
+ELEMNTS [ 0000000D]: 13
+CHECKSM [ 33401485]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.3
+VERSION [ 0001]: 1
+PAGENUM [ 00000003]: 3
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000025]: 37
+MINSEQN [0000000000000025]: 37
+ELEMNTS [ 0000000B]: 11
+CHECKSM [ 664CD0D8]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.4
+VERSION [ 0001]: 1
+PAGENUM [ 00000004]: 4
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [0000000000000030]: 48
+MINSEQN [0000000000000030]: 48
+ELEMNTS [ 0000000D]: 13
+CHECKSM [ 4057847E]
+# CHECKPOINT mixed-compression-queue-data-dir/queue/main/checkpoint.head
+VERSION [ 0001]: 1
+PAGENUM [ 00000005]: 5
+1UNAKPG [ 00000000]: 0
+1UNAKSQ [000000000000003D]: 61
+MINSEQN [000000000000003D]: 61
+ELEMNTS [ 0000000A]: 10
+CHECKSM [ C404251C]
+~~~
\ No newline at end of file
diff --git a/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz
new file mode 100644
index 00000000000..c5abcb94842
Binary files /dev/null and b/qa/integration/fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz differ
diff --git a/qa/integration/fixtures/pq_drain_spec.yml b/qa/integration/fixtures/pq_drain_spec.yml
new file mode 100644
index 00000000000..9b135a29e70
--- /dev/null
+++ b/qa/integration/fixtures/pq_drain_spec.yml
@@ -0,0 +1,3 @@
+---
+services:
+ - logstash
\ No newline at end of file
diff --git a/qa/integration/specs/pq_drain_spec.rb b/qa/integration/specs/pq_drain_spec.rb
new file mode 100644
index 00000000000..b874f12e29d
--- /dev/null
+++ b/qa/integration/specs/pq_drain_spec.rb
@@ -0,0 +1,144 @@
+# Licensed to Elasticsearch B.V. under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Elasticsearch B.V. licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require_relative '../framework/fixture'
+require_relative '../framework/settings'
+require_relative '../services/logstash_service'
+require_relative '../framework/helpers'
+require "logstash/devutils/rspec/spec_helper"
+
+require 'stud/temporary'
+
+if ENV['FEATURE_FLAG'] == 'persistent_queues'
+ describe "Test logstash queue draining" do
+ before(:all) { @fixture = Fixture.new(__FILE__) }
+ after(:all) { @fixture&.teardown }
+
+ let(:logstash_service) { @fixture.get_service("logstash") }
+
+ shared_examples 'pq drain' do |queue_compression_setting|
+ let(:settings_flags) { super().merge('queue.drain' => true) }
+
+ around(:each) do |example|
+ Stud::Temporary.directory('data') do |tempdir|
+ # expand the fixture tarball into our temp data dir
+ data_dir_tarball = File.join(__dir__, '../fixtures/data_dirs/mixed-compression-queue-data-dir.tar.gz')
+ `tar --directory #{Shellwords.escape(tempdir)} --strip-components 1 -xzf "#{Shellwords.escape(data_dir_tarball)}"`
+
+ @data_dir = tempdir
+ example.call
+ end
+ end
+
+ around(:each) do |example|
+ Stud::Temporary.file('output') do |tempfile|
+ @output_file = tempfile.path
+ example.call
+ end
+ end
+
+ let(:pipeline) do
+ <<~PIPELINE
+ input { generator { count => 1 type => seed } }
+ output { file { path => "#{@output_file}" codec => json_lines } }
+ PIPELINE
+ end
+
+ it "reads the contents of the PQ and drains" do
+
+ unacked_queued_elements = Pathname.new(@data_dir).glob('queue/main/checkpoint*').map { |cpf| decode_checkpoint(cpf) }
+ .map { |cp| (cp.elements - (cp.first_unacked_seq - cp.min_sequence)) }.reduce(&:+)
+
+ invoke_args = %W(
+ --log.level=debug
+ --path.settings=#{File.dirname(logstash_service.application_settings_file)}
+ --path.data=#{@data_dir}
+ --pipeline.workers=2
+ --pipeline.batch.size=8
+ --config.string=#{pipeline}
+ )
+ invoke_args << "-Squeue.compression=#{queue_compression_setting}" unless queue_compression_setting.nil?
+
+ status = logstash_service.run(*invoke_args)
+
+ aggregate_failures('process output') do
+ expect(status.exit_code).to be_zero
+ expect(status.stderr_and_stdout).to include("queue.type: persisted")
+ expect(status.stderr_and_stdout).to include("queue.drain: true")
+ expect(status.stderr_and_stdout).to include("queue.compression: #{queue_compression_setting}") unless queue_compression_setting.nil?
+ end
+
+ aggregate_failures('processing result') do
+ # count the events, make sure they're all the right shape.
+ expect(::File::size(@output_file)).to_not be_zero
+
+ written_events = ::File::read(@output_file).lines.map { |line| JSON.load(line) }
+ expect(written_events.size).to eq(unacked_queued_elements + 1)
+ timestamps = written_events.map {|event| event['@timestamp'] }
+ expect(timestamps.uniq.size).to eq(written_events.size)
+ end
+
+ aggregate_failures('resulting queue state') do
+ # glob the data dir and make sure things have been cleaned up.
+ # we should only have a head checkpoint and a single fully-acked page.
+ checkpoints = Pathname.new(@data_dir).glob('queue/main/checkpoint*')
+ checkpoints.each { |cp| $stderr.puts("CP>#{cp}")}
+ expect(checkpoints.size).to eq(1)
+ expect(checkpoints.first.basename.to_s).to eq('checkpoint.head')
+ checkpoint = decode_checkpoint(checkpoints.first)
+ expect(checkpoint.first_unacked_page).to eq(checkpoint.page_number)
+ expect(checkpoint.first_unacked_seq).to eq(checkpoint.min_sequence + checkpoint.elements)
+
+ pages = Pathname.new(@data_dir).glob('queue/main/page*')
+ pages.each { |pg| $stderr.puts("PG>#{pg}")}
+ expect(pages.size).to eq(1)
+ end
+ end
+ end
+
+ context "`queue.compression` setting" do
+ %w(none speed balanced size).each do |explicit_compression_setting|
+ context "explicit `#{explicit_compression_setting}`" do
+ include_examples 'pq drain', explicit_compression_setting
+ end
+ end
+ context "default setting" do
+ include_examples 'pq drain', nil
+ end
+ end
+ end
+
+ def decode_checkpoint(path)
+ bytes = path.read(encoding: 'BINARY').bytes
+
+ bstoi = -> (bs) { bs.reduce(0) {|m,b| (m<<8)+b } }
+
+ version = bstoi[bytes.slice(0,2)]
+ pagenum = bstoi[bytes.slice(2,4)]
+ first_unacked_page = bstoi[bytes.slice(6,4)]
+ first_unacked_seq = bstoi[bytes.slice(10,8)]
+ min_sequence = bstoi[bytes.slice(18,8)]
+ elements = bstoi[bytes.slice(26,4)]
+
+ OpenStruct.new(version: version,
+ page_number: pagenum,
+ first_unacked_page: first_unacked_page,
+ first_unacked_seq: first_unacked_seq,
+ min_sequence: min_sequence,
+ elements: elements)
+ end
+end
\ No newline at end of file