Skip to content
Closed
4 changes: 4 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`.
# The default `none` is able to decompress previously-written events, even if they were compressed.
#
# queue.compression: none
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
Expand Down
1 change: 1 addition & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var validSettings = []string{
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval", // remove it for #17155
"queue.compression",
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ module Environment
Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # remove it for #17155
Setting::Boolean.new("queue.checkpoint.retry", true),
Setting::SettingString.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::SettingNumeric.new("dead_letter_queue.flush_interval", 5000),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def self.included(base)
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
"queue.compression",
"queue.drain",
"queue.max_bytes",
"queue.max_events",
Expand Down
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
LogStash::Setting::SettingNumeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
LogStash::Setting::SettingString.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.logstash.ackedqueue;

import org.logstash.util.CleanerThreadLocal;
import org.logstash.util.SetOnceReference;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

/**
* Subclasses of {@link AbstractDeflateAwareCompressionCodec} are {@link CompressionCodec}s that are capable
* of detecting and decompressing deflate-compressed events. When decoding byte sequences that are <em>NOT</em>
* deflate-compressed, the given bytes are emitted verbatim.
*/
abstract class AbstractDeflateAwareCompressionCodec implements CompressionCodec {

static final int BAOS_SHAREABLE_THRESHOLD_BYTES = 4096;

static final Cleaner GLOBAL_CLEANER = Cleaner.create();

private final CleanerThreadLocal<BufferedInflater> bufferedInflaterThreadLocal;

public AbstractDeflateAwareCompressionCodec() {
this.bufferedInflaterThreadLocal = CleanerThreadLocal
.withInitial(BufferedInflater::new)
.withCleanAction(BufferedInflater::release, GLOBAL_CLEANER);
}

@Override
public byte[] decode(byte[] data) {
if (!isDeflate(data)) {
return data;
}
final BufferedInflater bufferedInflater = bufferedInflaterThreadLocal.get();
try {
return bufferedInflater.decode(data);
} catch (IOException e) {
throw new RuntimeException("IOException while decoding", e);
}
}

static boolean isDeflate(byte[] data) {
if (data.length < 2) { return false; }

// parse first two bytes as big-endian short
short header = (short) (((data[0] & 0xFF) << 8) | (data[1] & 0xFF));

/*
* RFC-1950: ZLIB Compressed Data Format Specification version 3.3
* https://www.ietf.org/rfc/rfc1950.txt
* ┏━━━━ CMF ━━━━━┳━━━━━━━━━━ FLG ━━━━━━━━━━┓
* ┠─CINFO─┬──CM──╂─FLEVEL─┬─FDICT─┬─FCHECK─┨
* ┃ 0XXX │ 1000 ┃ XX │ 0 │ XXXXX ┃
* ┗━━━━━━━┷━━━━━━┻━━━━━━━━┷━━━━━━━┷━━━━━━━━┛
* CINFO: 0XXX // always LTE 7 (7 is 32KB window)
* CM: 1000 // always 8 for deflate
* DFICT: 0 // always unset (no dictionary)
*
*/// 0XXX_1000_XX_0_XXXXX
short mask = (short) 0b1000_1111_00_1_00000; // bits to keep
short flip = (short) 0b0000_1000_00_0_00000; // bits to flip
short goal = (short) 0b0000_0000_00_0_00000; // goal state
if (((header & mask) ^ flip) != goal) {
return false;
}

// additionally the FCHECK ensures that
// the big-endian header is a multiple of 31
return header % 31 == 0;
}

/**
* A {@link BufferedInflater} is a convenience wrapper around the complexities
* of managing an {@link Inflater}, an intermediate {@code byte[]} buffer, and
* a {@link ByteArrayOutputStream}. It enables internal reuse of small buffers
* to reduce allocations.
*/
static class BufferedInflater {
private final Inflater inflater;
private final byte[] intermediateBuffer;
private final SetOnceReference<ByteArrayOutputStream> reusableBaosRef;

public BufferedInflater() {
this.inflater = new Inflater();
this.intermediateBuffer = new byte[1024];
this.reusableBaosRef = SetOnceReference.unset();
}

public byte[] decode(final byte[] data) throws IOException {
final ByteArrayOutputStream baos = getBaos(data.length);
try {
inflater.setInput(data);

do {
if (inflater.needsInput()) {
throw new IOException(String.format("prematurely reached end of encoded value (%s/%s)", inflater.getBytesRead(), inflater.getTotalIn()));
}
try {
int count = inflater.inflate(intermediateBuffer);
baos.write(intermediateBuffer, 0, count);
} catch (DataFormatException e) {
throw new RuntimeException("Failed to decode", e);
}
} while (!inflater.finished());

return baos.toByteArray();
} finally {
inflater.reset();
baos.reset();
}
}

public void release() {
inflater.end();
}

private ByteArrayOutputStream getBaos(final int encodedSize) {
if (encodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) {
return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES));
}
Comment on lines +114 to +116
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: if the thread never encounters a compressed payload that is small enough to safely use a shared (and therefore thread-permanent) BAOS, this is the bit that prevents us from taking on overhead of that shareable BAOS.

return new ByteArrayOutputStream(encodedSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package org.logstash.ackedqueue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.zip.Deflater;

public interface CompressionCodec {
Logger LOGGER = LogManager.getLogger(CompressionCodec.class);

byte[] encode(byte[] data);
byte[] decode(byte[] data);
Expand All @@ -21,4 +27,34 @@ public byte[] decode(byte[] data) {
return data;
}
};

static CompressionCodec fromConfigValue(final String configValue) {
return fromConfigValue(configValue, LOGGER);
}

static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
return switch (configValue) {
case "disabled" -> {
logger.warn("compression support has been disabled");
yield CompressionCodec.NOOP;
}
case "none" -> {
logger.info("compression support is enabled (read-only)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at a log entry saying "read-only" can confuse users that the PQ is in a read only mode

Suggested change
logger.info("compression support is enabled (read-only)");
logger.info("compression support is enabled (decompression only)");

yield DeflateAwareCompressionCodec.getInstance();
}
case "speed" -> {
logger.info("compression support is enabled (goal: speed)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal is a very comp sci term, and provides a sense of "we'll do our best but it's no promises". I don't think it's necessary to pass this subjectivity to the users, so maybe something like:

Suggested change
logger.info("compression support is enabled (goal: speed)");
logger.info("Compression is enabled - level: \"speed\"");

Or logger.info("Compression level set to: speed");

yield new DeflateEnabledCompressionCodec(Deflater.BEST_SPEED);
}
case "balanced" -> {
logger.info("compression support is enabled (goal: balanced)");
yield new DeflateEnabledCompressionCodec(Deflater.DEFAULT_COMPRESSION);
}
case "size" -> {
logger.info("compression support is enabled (goal: size)");
yield new DeflateEnabledCompressionCodec(Deflater.BEST_COMPRESSION);
}
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.logstash.ackedqueue;

/**
* A {@link DeflateAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed
* bytes, but performs no compression when encoding.
*/
class DeflateAwareCompressionCodec extends AbstractDeflateAwareCompressionCodec {
private static final DeflateAwareCompressionCodec INSTANCE = new DeflateAwareCompressionCodec();

static DeflateAwareCompressionCodec getInstance() {
return INSTANCE;
}

@Override
public byte[] encode(byte[] data) {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.logstash.ackedqueue;

import org.logstash.util.CleanerThreadLocal;
import org.logstash.util.SetOnceReference;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HexFormat;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

/**
* A {@link DeflateEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed
* bytes and performs deflate compression when encoding.
*/
class DeflateEnabledCompressionCodec extends AbstractDeflateAwareCompressionCodec implements CompressionCodec {

private final CleanerThreadLocal<BufferedDeflater> bufferedDeflaterThreadLocal;

DeflateEnabledCompressionCodec(final int level) {
this.bufferedDeflaterThreadLocal = CleanerThreadLocal
.withInitial(() -> new BufferedDeflater(level))
.withCleanAction(BufferedDeflater::release, GLOBAL_CLEANER);
}

@Override
public byte[] encode(byte[] data) {
final BufferedDeflater bufferedDeflater = bufferedDeflaterThreadLocal.get();
try {
return bufferedDeflater.encode(data);
} catch (IOException e) {
throw new RuntimeException("IOException while encoding", e);
}
}

/**
* A {@link BufferedDeflater} is a convenience wrapper around the complexities
* of managing an {@link Inflater}, an intermediate {@code byte[]} buffer, and
* a {@link ByteArrayOutputStream}. It enables internal reuse of small buffers
* to reduce allocations.
*/
static class BufferedDeflater {
private final Deflater deflater;
private final byte[] intermediateBuffer;
private final SetOnceReference<ByteArrayOutputStream> reusableBaosRef;

public BufferedDeflater(final int level) {
this.deflater = new Deflater(level);
this.intermediateBuffer = new byte[1024];
this.reusableBaosRef = SetOnceReference.unset();
}

public byte[] encode(final byte[] data) throws IOException {
final ByteArrayOutputStream baos = getBaos(data.length);
try {
deflater.setInput(data);
deflater.finish();

while (!deflater.finished()) {
int count = deflater.deflate(intermediateBuffer);
baos.write(intermediateBuffer, 0, count);
}
byte[] encodedBytes = baos.toByteArray();
assert isDeflate(encodedBytes) : String.format("invalid deflate signature `%s`", HexFormat.of().formatHex(encodedBytes,0,2));
return encodedBytes;
} finally {
deflater.reset();
baos.reset();
}
}

public void release() {
deflater.end();
}

private ByteArrayOutputStream getBaos(final int decodedSize) {
if (decodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) {
return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES));
}
return new ByteArrayOutputStream(decodedSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
Expand Down Expand Up @@ -63,6 +66,8 @@ public final class QueueFactoryExt extends RubyBasicObject {

private static final long serialVersionUID = 1L;

private static final Logger LOGGER = LogManager.getLogger(QueueFactoryExt.class);

public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
Expand Down Expand Up @@ -123,6 +128,13 @@ private static Settings extractQueueSettings(final IRubyObject settings) {
.checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
.checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
.compressionCodec(extractConfiguredCodec(settings))
.build();
}

private static CompressionCodec extractConfiguredCodec(final IRubyObject settings) {
final ThreadContext context = settings.getRuntime().getCurrentContext();
final String compressionSetting = getSetting(context, settings, QUEUE_COMPRESSION).asJavaString();
return CompressionCodec.fromConfigValue(compressionSetting, LOGGER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public class SettingKeyDefinitions {
public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry";

public static final String QUEUE_MAX_BYTES = "queue.max_bytes";

public static final String QUEUE_COMPRESSION = "queue.compression";
}
Loading