From 36ee3e40fc613ef2da10516a8561585289c6ebb9 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 11 Aug 2025 14:45:01 +0000 Subject: [PATCH 1/5] noop: refactor pq settings to make future changes simpler The `ackedqueue.SettingsImpl` uses an _immutable_ builder, which makes adding options cumbersome; each additional property added needs to modify code from all existing options. By introducing an api-internal temporary mutable builder, we can simplify the process of creating an immutable copy that has a single component modified. --- .../org/logstash/ackedqueue/SettingsImpl.java | 136 ++++++++++-------- 1 file changed, 73 insertions(+), 63 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java index dd23001886f..b117af717a2 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java @@ -20,6 +20,8 @@ package org.logstash.ackedqueue; +import java.util.function.Consumer; + /** * Persistent queue settings implementation. * */ @@ -34,27 +36,22 @@ public class SettingsImpl implements Settings { private boolean checkpointRetry; public static Builder builder(final Settings settings) { - return new BuilderImpl(settings.getDirPath(), settings.getElementClass(), settings.getCapacity(), - settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(), - settings.getCheckpointMaxWrites(), settings.getCheckpointRetry() - ); + return new BuilderImpl(settings); } public static Builder fileSettingsBuilder(final String dirForFiles) { return new BuilderImpl(dirForFiles); } - private SettingsImpl(final String dirForFiles, final Class elementClass, - final int capacity, final long queueMaxBytes, final int maxUnread, - final int checkpointMaxAcks, final int checkpointMaxWrites, boolean checkpointRetry) { - this.dirForFiles = dirForFiles; - this.elementClass = elementClass; - this.capacity = capacity; - this.queueMaxBytes = queueMaxBytes; - this.maxUnread = maxUnread; - this.checkpointMaxAcks = checkpointMaxAcks; - this.checkpointMaxWrites = checkpointMaxWrites; - this.checkpointRetry = checkpointRetry; + private SettingsImpl(final BuilderImpl builder) { + this.dirForFiles = builder.dirForFiles; + this.elementClass = builder.elementClass; + this.capacity = builder.capacity; + this.queueMaxBytes = builder.queueMaxBytes; + this.maxUnread = builder.maxUnread; + this.checkpointMaxAcks = builder.checkpointMaxAcks; + this.checkpointMaxWrites = builder.checkpointMaxWrites; + this.checkpointRetry = builder.checkpointRetry; } @Override @@ -147,88 +144,101 @@ private static final class BuilderImpl implements Builder { private final boolean checkpointRetry; private BuilderImpl(final String dirForFiles) { - this(dirForFiles, null, DEFAULT_CAPACITY, DEFAULT_MAX_QUEUE_BYTES, - DEFAULT_MAX_UNREAD, DEFAULT_CHECKPOINT_MAX_ACKS, DEFAULT_CHECKPOINT_MAX_WRITES, false - ); + this.dirForFiles = dirForFiles; + this.elementClass = null; + this.capacity = DEFAULT_CAPACITY; + this.queueMaxBytes = DEFAULT_MAX_QUEUE_BYTES; + this.maxUnread = DEFAULT_MAX_UNREAD; + this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS; + this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES; + this.checkpointRetry = false; } - private BuilderImpl(final String dirForFiles, final Class elementClass, - final int capacity, final long queueMaxBytes, final int maxUnread, - final int checkpointMaxAcks, final int checkpointMaxWrites, final boolean checkpointRetry) { - this.dirForFiles = dirForFiles; - this.elementClass = elementClass; - this.capacity = capacity; - this.queueMaxBytes = queueMaxBytes; - this.maxUnread = maxUnread; - this.checkpointMaxAcks = checkpointMaxAcks; - this.checkpointMaxWrites = checkpointMaxWrites; - this.checkpointRetry = checkpointRetry; + private BuilderImpl(final Settings settings) { + this.dirForFiles = settings.getDirPath(); + this.elementClass = settings.getElementClass(); + this.capacity = settings.getCapacity(); + this.queueMaxBytes = settings.getQueueMaxBytes(); + this.maxUnread = settings.getMaxUnread(); + this.checkpointMaxAcks = settings.getCheckpointMaxAcks(); + this.checkpointMaxWrites = settings.getCheckpointMaxWrites(); + this.checkpointRetry = settings.getCheckpointRetry(); + } + + private BuilderImpl(final MutableBuilder mutableBuilder) { + this.dirForFiles = mutableBuilder.dirForFiles; + this.elementClass = mutableBuilder.elementClass; + this.capacity = mutableBuilder.capacity; + this.queueMaxBytes = mutableBuilder.queueMaxBytes; + this.maxUnread = mutableBuilder.maxUnread; + this.checkpointMaxAcks = mutableBuilder.checkpointMaxAcks; + this.checkpointMaxWrites = mutableBuilder.checkpointMaxWrites; + this.checkpointRetry = mutableBuilder.checkpointRetry; } @Override public Builder elementClass(final Class elementClass) { - return new BuilderImpl( - this.dirForFiles, elementClass, this.capacity, this.queueMaxBytes, this.maxUnread, - this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry - ); + return mutate(mutable -> mutable.elementClass = elementClass); } @Override public Builder capacity(final int capacity) { - return new BuilderImpl( - this.dirForFiles, this.elementClass, capacity, this.queueMaxBytes, this.maxUnread, - this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry - ); + return mutate(mutable -> mutable.capacity = capacity); } @Override public Builder queueMaxBytes(final long size) { - return new BuilderImpl( - this.dirForFiles, this.elementClass, this.capacity, size, this.maxUnread, - this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry - ); + return mutate(mutable -> mutable.queueMaxBytes = size); } @Override public Builder maxUnread(final int maxUnread) { - return new BuilderImpl( - this.dirForFiles, this.elementClass, - this.capacity, this.queueMaxBytes, maxUnread, this.checkpointMaxAcks, - this.checkpointMaxWrites, this.checkpointRetry - ); + return mutate(mutable -> mutable.maxUnread = maxUnread); } @Override public Builder checkpointMaxAcks(final int checkpointMaxAcks) { - return new BuilderImpl( - this.dirForFiles, this.elementClass, - this.capacity, this.queueMaxBytes, this.maxUnread, checkpointMaxAcks, - this.checkpointMaxWrites, this.checkpointRetry - ); + return mutate(mutable -> mutable.checkpointMaxAcks = checkpointMaxAcks); } @Override public Builder checkpointMaxWrites(final int checkpointMaxWrites) { - return new BuilderImpl( - this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes, - this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, this.checkpointRetry - ); + return mutate(mutable -> mutable.checkpointMaxWrites = checkpointMaxWrites); } @Override public Builder checkpointRetry(final boolean checkpointRetry) { - return new BuilderImpl( - this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes, - this.maxUnread, this.checkpointMaxAcks, checkpointMaxWrites, checkpointRetry - ); + return mutate(mutable -> mutable.checkpointRetry = checkpointRetry); } @Override public Settings build() { - return new SettingsImpl( - this.dirForFiles, this.elementClass, this.capacity, this.queueMaxBytes, - this.maxUnread, this.checkpointMaxAcks, this.checkpointMaxWrites, this.checkpointRetry - ); + return new SettingsImpl(this); + } + + private Builder mutate(final Consumer mutator) { + final MutableBuilder mutableBuilder = new MutableBuilder(); + mutator.accept(mutableBuilder); + return mutableBuilder.toBuilder(); + } + + /** + * A {@link MutableBuilder} is an internal detail of the immutable {@link BuilderImpl} + * that allows its private {@link BuilderImpl#mutate} to work with a temporary mutable copy. + */ + private class MutableBuilder { + protected String dirForFiles = BuilderImpl.this.dirForFiles; + protected Class elementClass = BuilderImpl.this.elementClass; + protected int capacity = BuilderImpl.this.capacity; + protected long queueMaxBytes = BuilderImpl.this.queueMaxBytes; + protected int maxUnread = BuilderImpl.this.maxUnread; + protected int checkpointMaxAcks = BuilderImpl.this.checkpointMaxAcks; + protected int checkpointMaxWrites = BuilderImpl.this.checkpointMaxWrites; + protected boolean checkpointRetry = BuilderImpl.this.checkpointRetry; + + Builder toBuilder() { + return new BuilderImpl(this); + } } } } From a555131af6220a6798a02fa441f2bf84c9865862 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 11 Aug 2025 21:07:23 +0000 Subject: [PATCH 2/5] pq settings: validate while building --- .../org/logstash/ackedqueue/Settings.java | 29 +++++++++++++++++++ .../org/logstash/ackedqueue/SettingsImpl.java | 2 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java index 45b34fe0fd0..1623738659e 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Settings.java @@ -20,6 +20,9 @@ package org.logstash.ackedqueue; +import java.util.ArrayList; +import java.util.List; + /** * Persistent queue settings definition. * */ @@ -41,6 +44,32 @@ public interface Settings { boolean getCheckpointRetry(); + /** + * Validate and return the settings, or throw descriptive {@link QueueRuntimeException} + * @param settings the settings to validate + * @return the settings that were provided + */ + static Settings ensureValid(final Settings settings) { + final List errors = new ArrayList<>(); + + if (settings == null) { + errors.add("settings cannot be null"); + } else { + if (settings.getDirPath() == null) { + errors.add("dirPath cannot be null"); + } + if (settings.getElementClass() == null) { + errors.add("elementClass cannot be null"); + } + } + + if (!errors.isEmpty()) { + throw new QueueRuntimeException(String.format("Invalid Queue Settings: %s", errors)); + } + + return settings; + } + /** * Persistent queue Setting's fluent builder definition * */ diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java index b117af717a2..1d9ba67b0f8 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java @@ -213,7 +213,7 @@ public Builder checkpointRetry(final boolean checkpointRetry) { @Override public Settings build() { - return new SettingsImpl(this); + return Settings.ensureValid(new SettingsImpl(this)); } private Builder mutate(final Consumer mutator) { From 488f81c0ddf9be189a61f76b950c57754407fdd3 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 11 Aug 2025 21:11:38 +0000 Subject: [PATCH 3/5] noop: refacor pq-related constructors to take ackedqueue.Settings --- .../acked_queue_concurrent_stress_spec.rb | 11 +++++- .../instrument/wrapped_write_client_spec.rb | 13 ++++++- .../logstash/util/wrapped_acked_queue_spec.rb | 13 ++++++- .../logstash/ackedqueue/QueueFactoryExt.java | 39 +++++++++++-------- .../ackedqueue/ext/JRubyAckedQueueExt.java | 36 +++++++++-------- .../ext/JRubyWrappedAckedQueueExt.java | 39 ++++++++++++------- 6 files changed, 101 insertions(+), 50 deletions(-) diff --git a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb index 0b551494698..ccd37f6bb4e 100644 --- a/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb +++ b/logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb @@ -28,8 +28,17 @@ let(:output_strings) { [] } let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] } + let(:queue_settings) do + LogStash::AckedQueue.file_settings_builder(path) + .capacity(page_capacity) + .checkpointMaxAcks(queue_checkpoint_acks) + .checkpointMaxWrites(queue_checkpoint_writes) + .queueMaxBytes(queue_capacity) + .build + end + let(:queue) do - described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity) + described_class.new(queue_settings) end let(:writer_threads) do diff --git a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb index 171525a30a3..1c1f9a1fb2d 100644 --- a/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb +++ b/logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb @@ -125,7 +125,18 @@ def threaded_read_client context "WrappedAckedQueue" do let(:path) { Stud::Temporary.directory } - let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) } + + let(:queue_settings) do + LogStash::AckedQueue.file_settings_builder(path) + .capacity(1024) + .maxUnread(10) + .checkpointMaxAcks(1024) + .checkpointMaxWrites(1024) + .queueMaxBytes(4096) + .build + end + + let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) } before do read_client.set_events_metric(metric.namespace([:stats, :events])) diff --git a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb index 5c729196c4e..67093a78eea 100644 --- a/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb +++ b/logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb @@ -53,7 +53,18 @@ let(:checkpoint_acks) { 1024 } let(:checkpoint_writes) { 1024 } let(:path) { Stud::Temporary.directory } - let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) } + + let(:queue_settings) do + LogStash::AckedQueue.file_settings_builder(path) + .capacity(page_capacity) + .maxUnread(max_events) + .checkpointMaxAcks(checkpoint_acks) + .checkpointMaxWrites(checkpoint_writes) + .queueMaxBytes(max_bytes) + .build + end + + let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) } after do queue.close diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index bcc9b139e58..6a10c2a3e7f 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -31,12 +31,15 @@ import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.Event; import org.logstash.RubyUtil; import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.SettingKeyDefinitions; import org.logstash.execution.AbstractWrappedQueueExt; import org.logstash.ext.JrubyWrappedSynchronousQueueExt; +import static org.logstash.common.SettingKeyDefinitions.*; + /** * Persistent queue factory JRuby extension. * */ @@ -69,10 +72,8 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final final IRubyObject settings) throws IOException { final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString(); if (PERSISTED_TYPE.equals(type)) { - final Path queuePath = Paths.get( - getSetting(context, settings, SettingKeyDefinitions.PATH_QUEUE).asJavaString(), - getSetting(context, settings, SettingKeyDefinitions.PIPELINE_ID).asJavaString() - ); + final Settings queueSettings = extractQueueSettings(settings); + final Path queuePath = Paths.get(queueSettings.getDirPath()); // Files.createDirectories raises a FileAlreadyExistsException // if pipeline queue path is a symlink, so worth checking against Files.exists @@ -80,18 +81,7 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final Files.createDirectories(queuePath); } - return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS) - .initialize( - context, new IRubyObject[]{ - context.runtime.newString(queuePath.toString()), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_PAGE_CAPACITY), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_EVENTS), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_WRITES), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_ACKS), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_CHECKPOINT_RETRY), - getSetting(context, settings, SettingKeyDefinitions.QUEUE_MAX_BYTES) - } - ); + return JRubyWrappedAckedQueueExt.create(context, queueSettings); } else if (MEMORY_TYPE.equals(type)) { return new JrubyWrappedSynchronousQueueExt( context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS @@ -118,4 +108,21 @@ private static IRubyObject getSetting(final ThreadContext context, final IRubyOb final String name) { return settings.callMethod(context, "get_value", context.runtime.newString(name)); } + + private static Settings extractQueueSettings(final IRubyObject settings) { + final ThreadContext context = settings.getRuntime().getCurrentContext(); + final Path queuePath = Paths.get( + getSetting(context, settings, PATH_QUEUE).asJavaString(), + getSetting(context, settings, PIPELINE_ID).asJavaString() + ); + return SettingsImpl.fileSettingsBuilder(queuePath.toString()) + .elementClass(Event.class) + .capacity(getSetting(context, settings, QUEUE_PAGE_CAPACITY).toJava(Integer.class)) + .maxUnread(getSetting(context, settings, QUEUE_MAX_EVENTS).toJava(Integer.class)) + .checkpointMaxWrites(getSetting(context, settings, QUEUE_CHECKPOINT_WRITES).toJava(Integer.class)) + .checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class)) + .checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue()) + .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class)) + .build(); + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java index c65cb7c2df3..35cb765dbc9 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java @@ -31,6 +31,7 @@ import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; +import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.Event; @@ -39,6 +40,7 @@ import org.logstash.ackedqueue.Batch; import org.logstash.ackedqueue.Queue; import org.logstash.ackedqueue.QueueExceptionMessages; +import org.logstash.ackedqueue.Settings; import org.logstash.ackedqueue.SettingsImpl; /** @@ -60,27 +62,27 @@ public Queue getQueue() { return this.queue; } - public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents, int checkpointMaxWrites, - int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) { + public static JRubyAckedQueueExt create(final Settings settings) { JRubyAckedQueueExt queueExt = new JRubyAckedQueueExt(RubyUtil.RUBY, RubyUtil.ACKED_QUEUE_CLASS); - queueExt.initializeQueue(path, capacity, maxEvents, checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, - maxBytes); + queueExt.queue = new Queue(settings); return queueExt; } - private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, - int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) { - this.queue = new Queue( - SettingsImpl.fileSettingsBuilder(path) - .capacity(capacity) - .maxUnread(maxEvents) - .queueMaxBytes(maxBytes) - .checkpointMaxAcks(checkpointMaxAcks) - .checkpointMaxWrites(checkpointMaxWrites) - .checkpointRetry(checkpointRetry) - .elementClass(Event.class) - .build() - ); + /** + * Helper method for retrieving a ruby-usable {@link Settings.Builder} with the provided path as its directory, + * using {@link Event} as its element-type. + * + * @param context the ruby thread context + * @param recv noop receiver (will be rubified LogStash::AckedQueue class) + * @param path the path to the queue + * @return a ruby-usable proxy for {@link Settings.Builder} + */ + @JRubyMethod(meta = true, name = "file_settings_builder") + public static IRubyObject fileSettingsBuilder(final ThreadContext context, IRubyObject recv, final RubyString path) { + final Settings.Builder settingsBuilder = SettingsImpl + .fileSettingsBuilder(path.asJavaString()) + .elementClass(Event.class); + return JavaUtil.convertJavaToRuby(context.runtime, settingsBuilder); } @JRubyMethod(name = "max_unread_events") diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index 257ce824214..15b95ec8dea 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -25,13 +25,13 @@ import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; -import org.jruby.RubyFixnum; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; -import org.jruby.runtime.Arity; +import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.ackedqueue.Settings; import org.logstash.execution.AbstractWrappedQueueExt; import org.logstash.execution.QueueReadClientBase; import org.logstash.ext.JRubyAbstractQueueWriteClientExt; @@ -49,23 +49,34 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { private JRubyAckedQueueExt queue; - @JRubyMethod(optional = 8) - public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { - args = Arity.scanArgs(context.runtime, args, 7, 0); - int capacity = RubyFixnum.num2int(args[1]); - int maxEvents = RubyFixnum.num2int(args[2]); - int checkpointMaxWrites = RubyFixnum.num2int(args[3]); - int checkpointMaxAcks = RubyFixnum.num2int(args[4]); - boolean checkpointRetry = !((RubyBoolean) args[5]).isFalse(); - long queueMaxBytes = RubyFixnum.num2long(args[6]); - - this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents, - checkpointMaxWrites, checkpointMaxAcks, checkpointRetry, queueMaxBytes); + @JRubyMethod(required=1) + public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject settings) throws IOException { + if (JavaUtil.isJavaObject(settings)) { + this.queue = JRubyAckedQueueExt.create(JavaUtil.unwrapJavaObject(settings)); + } else { + // We should never get here, but previously had an initialize method + // that took 7 technically-optional ordered parameters. + throw new IllegalArgumentException( + String.format( + "Failed to instantiate JRubyWrappedAckedQueueExt with <%s:%s>", + settings.getClass().getName(), + settings)); + } this.queue.open(); return this; } + public static JRubyWrappedAckedQueueExt create(ThreadContext context, Settings settings) throws IOException { + return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS, settings); + } + + public JRubyWrappedAckedQueueExt(Ruby runtime, RubyClass metaClass, Settings settings) throws IOException { + super(runtime, metaClass); + this.queue = JRubyAckedQueueExt.create(settings); + this.queue.open(); + } + public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } From 2200af6229c5d71e167b8abceed5adad98c7b049 Mon Sep 17 00:00:00 2001 From: Rye Biesemeyer Date: Thu, 18 Sep 2025 09:02:07 -0700 Subject: [PATCH 4/5] pq init refactor: assertion style Co-authored-by: Andrea Selva --- .../logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index 15b95ec8dea..d2d374b56f7 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -51,9 +51,7 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { @JRubyMethod(required=1) public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject settings) throws IOException { - if (JavaUtil.isJavaObject(settings)) { - this.queue = JRubyAckedQueueExt.create(JavaUtil.unwrapJavaObject(settings)); - } else { + if (!JavaUtil.isJavaObject(settings)) { // We should never get here, but previously had an initialize method // that took 7 technically-optional ordered parameters. throw new IllegalArgumentException( @@ -62,6 +60,7 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject s settings.getClass().getName(), settings)); } + this.queue = JRubyAckedQueueExt.create(JavaUtil.unwrapJavaObject(settings)); this.queue.open(); return this; From 7c2bfd1445468bf403906267171c6f71d8ed21ed Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 18 Sep 2025 17:12:01 +0000 Subject: [PATCH 5/5] pq settings: builder need not be immutable, but settings should be --- .../org/logstash/ackedqueue/SettingsImpl.java | 90 ++++++------------- 1 file changed, 29 insertions(+), 61 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java index 1d9ba67b0f8..bc191f44a32 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SettingsImpl.java @@ -17,23 +17,20 @@ * under the License. */ - package org.logstash.ackedqueue; -import java.util.function.Consumer; - /** * Persistent queue settings implementation. * */ public class SettingsImpl implements Settings { - private String dirForFiles; - private Class elementClass; - private int capacity; - private long queueMaxBytes; - private int maxUnread; - private int checkpointMaxAcks; - private int checkpointMaxWrites; - private boolean checkpointRetry; + private final String dirForFiles; + private final Class elementClass; + private final int capacity; + private final long queueMaxBytes; + private final int maxUnread; + private final int checkpointMaxAcks; + private final int checkpointMaxWrites; + private final boolean checkpointRetry; public static Builder builder(final Settings settings) { return new BuilderImpl(settings); @@ -129,19 +126,19 @@ private static final class BuilderImpl implements Builder { private final String dirForFiles; - private final Class elementClass; + private Class elementClass; - private final int capacity; + private int capacity; - private final long queueMaxBytes; + private long queueMaxBytes; - private final int maxUnread; + private int maxUnread; - private final int checkpointMaxAcks; + private int checkpointMaxAcks; - private final int checkpointMaxWrites; + private int checkpointMaxWrites; - private final boolean checkpointRetry; + private boolean checkpointRetry; private BuilderImpl(final String dirForFiles) { this.dirForFiles = dirForFiles; @@ -165,80 +162,51 @@ private BuilderImpl(final Settings settings) { this.checkpointRetry = settings.getCheckpointRetry(); } - private BuilderImpl(final MutableBuilder mutableBuilder) { - this.dirForFiles = mutableBuilder.dirForFiles; - this.elementClass = mutableBuilder.elementClass; - this.capacity = mutableBuilder.capacity; - this.queueMaxBytes = mutableBuilder.queueMaxBytes; - this.maxUnread = mutableBuilder.maxUnread; - this.checkpointMaxAcks = mutableBuilder.checkpointMaxAcks; - this.checkpointMaxWrites = mutableBuilder.checkpointMaxWrites; - this.checkpointRetry = mutableBuilder.checkpointRetry; - } - @Override public Builder elementClass(final Class elementClass) { - return mutate(mutable -> mutable.elementClass = elementClass); + this.elementClass = elementClass; + return this; } @Override public Builder capacity(final int capacity) { - return mutate(mutable -> mutable.capacity = capacity); + this.capacity = capacity; + return this; } @Override public Builder queueMaxBytes(final long size) { - return mutate(mutable -> mutable.queueMaxBytes = size); + this.queueMaxBytes = size; + return this; } @Override public Builder maxUnread(final int maxUnread) { - return mutate(mutable -> mutable.maxUnread = maxUnread); + this.maxUnread = maxUnread; + return this; } @Override public Builder checkpointMaxAcks(final int checkpointMaxAcks) { - return mutate(mutable -> mutable.checkpointMaxAcks = checkpointMaxAcks); + this.checkpointMaxAcks = checkpointMaxAcks; + return this; } @Override public Builder checkpointMaxWrites(final int checkpointMaxWrites) { - return mutate(mutable -> mutable.checkpointMaxWrites = checkpointMaxWrites); + this.checkpointMaxWrites = checkpointMaxWrites; + return this; } @Override public Builder checkpointRetry(final boolean checkpointRetry) { - return mutate(mutable -> mutable.checkpointRetry = checkpointRetry); + this.checkpointRetry = checkpointRetry; + return this; } @Override public Settings build() { return Settings.ensureValid(new SettingsImpl(this)); } - - private Builder mutate(final Consumer mutator) { - final MutableBuilder mutableBuilder = new MutableBuilder(); - mutator.accept(mutableBuilder); - return mutableBuilder.toBuilder(); - } - - /** - * A {@link MutableBuilder} is an internal detail of the immutable {@link BuilderImpl} - * that allows its private {@link BuilderImpl#mutate} to work with a temporary mutable copy. - */ - private class MutableBuilder { - protected String dirForFiles = BuilderImpl.this.dirForFiles; - protected Class elementClass = BuilderImpl.this.elementClass; - protected int capacity = BuilderImpl.this.capacity; - protected long queueMaxBytes = BuilderImpl.this.queueMaxBytes; - protected int maxUnread = BuilderImpl.this.maxUnread; - protected int checkpointMaxAcks = BuilderImpl.this.checkpointMaxAcks; - protected int checkpointMaxWrites = BuilderImpl.this.checkpointMaxWrites; - protected boolean checkpointRetry = BuilderImpl.this.checkpointRetry; - - Builder toBuilder() { - return new BuilderImpl(this); - } - } } }