Skip to content

Commit 23215f7

Browse files
jsvdmergify[bot]
authored andcommitted
make queue_max_bytes a long again allowing queues bigger than 2GB (#18366)
The queue factory setting "queueMaxBytes" used to be a long [link](https://github.com/elastic/logstash/pull/18180/files#diff-a377851670e06cd113751aa73b726b446dfb721d03d6979b9195afff7facd9f5L60): ```ruby long queueMaxBytes = RubyFixnum.num2long(args[6]); ``` But with the refactoring of #18180, it [became an Integer by mistake](https://github.com/elastic/logstash/pull/18180/files#diff-01eec670d2beabcd43041ff7fca8bc907eb227471dce886bb757b6414a655429R125): ```java .queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class)) ``` Which prevents queues being bigger than 2^31-1, causing crashes to anyone setting their PQ to 2GB or more: ```yaml queue.max_bytes: 2147483647 # 2GB-1 OK queue.max_bytes: 2147483648 # 2GB FAIL ``` This change makes queueMaxBytes a Long again, with a test confirming that queues larger than 2ˆ31-1 (max int) can be created. Without this change the new test fails: ``` ❯ bin/rspec logstash-core/spec/logstash/queue_factory_spec.rb ..F. Failures: 1) LogStash::QueueFactory when `queue.type` is `persisted` when queue.max_bytes is larger than Java int does not raise error Failure/Error: expect { queue = subject.create(settings) }.to_not raise_error expected no Exception, got #<RangeError: too big for int: 2147483648> with backtrace: # org/logstash/ackedqueue/QueueFactoryExt.java:97:in `create' # org/logstash/ackedqueue/QueueFactoryExt.java:88:in `create' # ./logstash-core/spec/logstash/queue_factory_spec.rb:85:in `block in <main>' # ./logstash-core/spec/logstash/queue_factory_spec.rb:85:in `block in <main>' # ./lib/bootstrap/rspec.rb:36:in `<main>' # ./logstash-core/spec/logstash/queue_factory_spec.rb:85:in `block in <main>' # ./lib/bootstrap/rspec.rb:36:in `<main>' Finished in 0.03879 seconds (files took 0.03787 seconds to load) 4 examples, 1 failure Failed examples: rspec ./logstash-core/spec/logstash/queue_factory_spec.rb:84 # LogStash::QueueFactory when `queue.type` is `persisted` when queue.max_bytes is larger than Java int does not raise error ``` But with the change it passes: ``` ❯ bin/rspec logstash-core/spec/logstash/queue_factory_spec.rb -fd LogStash::QueueFactory when `queue.type` is `persisted` returns a `WrappedAckedQueue` per pipeline id subdirectory creation creates a queue directory based on the pipeline id when queue.max_bytes is larger than Java int does not raise error when `queue.type` is `memory` returns a `WrappedSynchronousQueue` Finished in 0.03292 seconds (files took 0.04052 seconds to load) 4 examples, 0 failures ``` (cherry picked from commit 6c48e51)
1 parent 959a37f commit 23215f7

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

logstash-core/spec/logstash/queue_factory_spec.rb

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,23 @@
5050
subject { described_class }
5151

5252
context "when `queue.type` is `persisted`" do
53+
let(:queue_path) { ::File.join(settings.get("path.queue"), pipeline_id) }
54+
5355
before do
5456
settings.set("queue.type", "persisted")
5557
end
5658

59+
after(:each) do
60+
FileUtils.rm_rf(queue_path)
61+
end
62+
5763
it "returns a `WrappedAckedQueue`" do
5864
queue = subject.create(settings)
5965
expect(queue).to be_kind_of(LogStash::WrappedAckedQueue)
6066
queue.close
6167
end
6268

6369
describe "per pipeline id subdirectory creation" do
64-
let(:queue_path) { ::File.join(settings.get("path.queue"), pipeline_id) }
65-
66-
after :each do
67-
FileUtils.rm_rf(queue_path)
68-
end
6970

7071
it "creates a queue directory based on the pipeline id" do
7172
expect(Dir.exist?(queue_path)).to be_falsey
@@ -74,6 +75,19 @@
7475
queue.close
7576
end
7677
end
78+
79+
context "when queue.max_bytes is larger than Java int" do
80+
let(:large_queue_max_bytes) { 2**31 } # 2^31 bytes, bigger than 2^31-1 int limit
81+
before(:each) do
82+
settings.set("queue.max_bytes", large_queue_max_bytes)
83+
end
84+
it "does not raise error" do
85+
queue = nil
86+
expect { queue = subject.create(settings) }.to_not raise_error
87+
expect(queue.queue.max_size_in_bytes).to eq(large_queue_max_bytes)
88+
queue.close
89+
end
90+
end
7791
end
7892

7993
context "when `queue.type` is `memory`" do

logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private static Settings extractQueueSettings(final IRubyObject settings) {
160160
.checkpointMaxWrites(getSetting(context, settings, QUEUE_CHECKPOINT_WRITES).toJava(Integer.class))
161161
.checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
162162
.checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
163-
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
163+
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Long.class))
164164
.compressionCodecFactory(extractConfiguredCodec(settings))
165165
.build();
166166
}

0 commit comments

Comments
 (0)